ITABLES¶
In [1]:
import time
start_time = time.time() # Record the start time
# to enable itables
from itables import init_notebook_mode
init_notebook_mode(all_interactive=True)
DELTA SPARK CONFIGURATION¶
In [2]:
import os
import shutil
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
def ensure_directories_exist(warehouse_dir, metastore_db_path):
"""
Ensures the necessary directories for the warehouse and metastore exist.
Parameters:
warehouse_dir (str): Path to the warehouse directory (Spark catalog).
metastore_db_path (str): Path to the metastore database.
"""
os.makedirs(warehouse_dir, exist_ok=True)
os.makedirs(os.path.dirname(metastore_db_path), exist_ok=True)
def create_spark_session(
app_name="DeltaCatalog",
warehouse_dir="./warehouse-spark/spark_catalog",
metastore_db_path="./warehouse-spark/metastore_db",
):
"""
Creates and initializes a SparkSession with Delta Lake support and persistent metastore.
Parameters:
app_name (str): Name of the Spark application.
warehouse_dir (str): Path to the Spark catalog warehouse directory.
metastore_db_path (str): Path to the persistent metastore database (Derby).
Returns:
SparkSession: Configured SparkSession instance.
"""
# Ensure required directories exist
ensure_directories_exist(warehouse_dir, metastore_db_path)
# Configure SparkSession with Delta Lake
builder = (
SparkSession.builder.appName(app_name)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.sql.parquet.compression.codec", "gzip")
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
.config("spark.databricks.delta.autoCompact.enabled", "true")
.config("spark.sql.warehouse.dir", os.path.abspath(warehouse_dir))
.config(
"javax.jdo.option.ConnectionURL",
f"jdbc:derby:{os.path.abspath(metastore_db_path)};create=true",
)
) # .config("spark.sql.catalogImplementation", "hive") \
# .enableHiveSupport()
# Initialize Spark with Delta
spark = configure_spark_with_delta_pip(builder).getOrCreate()
print(
f"SparkSession created with Delta and persistent metastore at: {warehouse_dir}"
)
return spark
CREATE DATABASE AND DELTA TABLES¶
In [3]:
def create_database(spark_session, database_name):
"""
Creates a database if it does not already exist.
Parameters:
spark_session (SparkSession): The active SparkSession.
database_name (str): The name of the database to be created.
"""
spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
print(f"Database '{database_name}' created or already exists.")
def list_all_databases_and_tables(spark_session):
"""
Lists all databases and their tables in Spark.
Parameters:
spark_session (SparkSession): The active SparkSession.
Returns:
dict: A dictionary where keys are database names and values are lists of table names in each database.
"""
# List all databases
databases = spark_session.catalog.listDatabases()
# Create a dictionary to store database names and their corresponding table lists
database_tables = {}
print("The following databases and tables are present in the Spark Catalog.")
print()
for database in databases:
# Set the current database to the specified database
spark_session.sql(f"USE {database.name}")
# List all tables in the database
tables = spark_session.catalog.listTables(database.name)
# Extract table names from the list of table objects
table_names = [table.name for table in tables]
# Store the tables for the current database
database_tables[database.name] = table_names
# Print the database and tables
for table in table_names:
print(f"Database: {database.name}, Table: {table}")
print()
return database_tables
CREATE SPARK DATAFRAME¶
In [4]:
def create_dataframe_from_list_dict_using_alphabetical_order_from_columns(
spark_session, list_data_dict
):
"""
Creates a Spark DataFrame from a list of dictionaries, reordering columns in alphabetical order.
Parameters:
spark_session (SparkSession): The active Spark session.
list_data_dict (list): A list of dictionaries, where each dictionary represents a row.
Returns:
DataFrame: The created Spark DataFrame with columns in alphabetical order.
"""
if list_data_dict:
df = spark_session.createDataFrame(list_data_dict)
return df
else:
raise ValueError("The input list is empty.")
def create_dataframe_from_list_dict(spark_session, list_data_dict):
"""
Creates a Spark DataFrame from a list of dictionaries, preserving the order of the keys.
Parameters:
spark_session (SparkSession): The active Spark session.
list_data_dict (list): A list of dictionaries, where each dictionary represents a row.
Returns:
DataFrame: The created Spark DataFrame with columns in the order of the keys.
"""
if not list_data_dict:
raise ValueError("The input list is empty.")
# Get the order of keys from the first dictionary
columns_order = list(list_data_dict[0].keys())
# Create the DataFrame and reorder columns
df = spark_session.createDataFrame(list_data_dict)
df = df.select(*columns_order) # Reorder columns explicitly
return df
def split_spark_dataframe(spark_dataframe, num_parts):
"""
Splits a Spark DataFrame into the specified number of parts, ensuring each part has at least one row.
If the requested number of parts exceeds the total rows, it creates as many balanced parts as possible.
Parameters:
spark_dataframe (DataFrame): The Spark DataFrame to be split.
num_parts (int): The desired number of parts to split the DataFrame into.
Returns:
List[DataFrame]: A list containing the split DataFrames.
"""
total_rows = spark_dataframe.count()
if total_rows == 0:
print("The DataFrame is empty. No parts created.")
return []
# Adjust number of parts if more parts are requested than rows
actual_parts = min(num_parts, total_rows)
# Calculate base rows per part and distribute remaining rows
rows_per_part = total_rows // actual_parts
extra_rows = total_rows % actual_parts
split_dataframes = []
start_row = 0
for i in range(actual_parts):
# Calculate rows for the current part
rows_in_this_part = rows_per_part + (1 if i < extra_rows else 0)
end_row = start_row + rows_in_this_part
# Select the rows for the current part using offset and limit
split_dataframes.append(
spark_dataframe.offset(start_row).limit(rows_in_this_part)
)
start_row = end_row # Update start row for the next part
print(f"Successfully created {len(split_dataframes)} DataFrames.")
return split_dataframes
CREATE DELTA TABLES¶
In [5]:
import os
import shutil
def create_delta_table_with_spark_dataframe_and_register(
spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None
):
"""
Creates a Delta table in the specified database and registers it with SQL.
Parameters:
spark_session (SparkSession): The active SparkSession.
database_name (str): The name of the database where the table will be created.
table_name (str): The name of the table to be created.
spark_dataframe (DataFrame): The Spark DataFrame whose data will be used to create the table.
warehouse_dir (str): The root directory for the warehouse.
partition_by (list, optional): List of column names to partition the table by (default is None).
"""
# Ensure the database exists or create it
available_databases = [db.name for db in spark_session.catalog.listDatabases()]
if database_name not in available_databases:
spark_session.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
# Set the active database
spark_session.sql(f"USE {database_name}")
# Define the table path based on the database and table name
table_path = f"{warehouse_dir}/{database_name}/{table_name}"
write_options = spark_dataframe.write.format("delta").mode("overwrite")
# Apply partitioning if specified
if partition_by:
write_options.partitionBy(*partition_by)
# Save the DataFrame as a Delta table in the location
write_options.save(table_path)
# Register the Delta table in the Spark catalog using SQL
spark_session.sql(
f"""
CREATE OR REPLACE TABLE {table_name}
USING DELTA
AS
SELECT * FROM parquet.`{table_path}`;
"""
)
# Delete the files generated in the table path
if os.path.exists(table_path):
shutil.rmtree(table_path)
print(f"Temporary files at '{table_path}' have been deleted.")
else:
print(f"No temporary files found at '{table_path}'.")
print(
f"Table '{table_name}' created and registered at '{table_path}' in database '{database_name}' with partitioning by {partition_by if partition_by else 'None'}."
)
def save_dataframe_as_parquet(spark_dataframe, file_name, file_path, partition_by=None):
"""
Saves a Spark DataFrame to the specified path in Parquet format.
Parameters:
spark_dataframe (DataFrame): The Spark DataFrame to be saved.
file_name (str): The name of the file (or dataset) to be created.
file_path (str): The location where the file will be stored.
partition_by (list, optional): List of column names to partition the data by (default is None).
"""
write_options = spark_dataframe.write.format("parquet").mode("overwrite")
if partition_by:
write_options.partitionBy(*partition_by)
write_options.save(file_path)
print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")
def save_dataframe_as_delta_parquet(spark_dataframe, file_name, file_path, partition_by=None):
"""
Saves a Spark DataFrame to the specified path in Delta Parquet format.
Parameters:
spark_dataframe (DataFrame): The Spark DataFrame to be saved.
file_name (str): The name of the file (or dataset) to be created.
file_path (str): The location where the file will be stored.
partition_by (list, optional): List of column names to partition the data by (default is None).
"""
write_options = spark_dataframe.write.format("delta").mode("overwrite")
# Apply partitioning if partition_by is provided
if partition_by:
write_options.partitionBy(*partition_by)
# Save the DataFrame as a Delta Parquet file at the specified location
write_options.save(f"{file_path}/{file_name}")
print(f"DataFrame saved as Delta Parquet at '{file_path}/{file_name}' with partitioning by {partition_by if partition_by else 'None'}.")
def create_delta_table_in_database(spark_session, database_name, table_name, spark_dataframe, warehouse_dir, partition_by=None):
"""
Creates a Delta table in a Spark database, ensuring that any existing table or directory is deleted before creating
the new table. The table is created with the delta.enableChangeDataFeed property enabled.
Parameters:
spark_session (SparkSession): The active Spark session to run SQL queries.
database_name (str): The name of the database where the table will be stored.
table_name (str): The name of the table to be created.
spark_dataframe (DataFrame): The DataFrame to be saved as a Delta table.
warehouse_dir (str): The base directory where Delta tables are stored.
partition_by (list, optional): List of columns to partition the table by. If not provided, the table will not be partitioned.
Exceptions:
- If the specified partition columns do not exist in the DataFrame, a ValueError will be raised.
"""
# Define the path for the Delta table
table_path = f"{warehouse_dir}/{database_name}/{table_name}"
full_table_name = f"{database_name}.{table_name}"
# Remove the table directory if it exists (manually)
if os.path.exists(table_path):
shutil.rmtree(table_path) # Remove the directory and its contents
print(f"Directory for table '{full_table_name}' deleted.")
# Drop the table from the database if it exists (to avoid conflicts)
try:
spark_session.sql(f"DROP TABLE IF EXISTS {full_table_name}")
print(f"Table '{full_table_name}' dropped.")
except Exception as e:
print(f"Could not drop the table: {str(e)}")
# Write the new Delta table with overwrite mode
write_options = spark_dataframe.write.format("delta").mode("overwrite")
# Check if partitioning is specified and verify columns exist in the DataFrame
if partition_by:
missing_columns = [col for col in partition_by if col not in spark_dataframe.columns]
if missing_columns:
raise ValueError(f"The following partition columns do not exist in the DataFrame: {missing_columns}")
write_options = write_options.partitionBy(*partition_by)
# Save the Delta table in the database
write_options.saveAsTable(full_table_name)
# Enable Change Data Feed on the Delta table
try:
spark_session.sql(f"ALTER TABLE {full_table_name} SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')")
print(f"Property 'delta.enableChangeDataFeed' enabled for table '{full_table_name}'.")
except Exception as e:
print(f"Error enabling 'delta.enableChangeDataFeed': {str(e)}")
print(f"Table '{table_name}' created in database '{database_name}' at '{table_path}' with partitioning by {partition_by if partition_by else 'none'}.")
def read_parquet_or_delta_file(spark_session, directory_path):
"""
Reads a Parquet or Delta file from the specified directory path.
This function attempts to read a Parquet file from the provided path using the Spark session.
If the operation fails (e.g., the file does not exist or the format is incompatible),
an error message is displayed, and None is returned.
Parameters:
spark_session (SparkSession): The active Spark session used for reading the file.
directory_path (str): The path to the directory containing the Parquet or Delta file.
Returns:
DataFrame: The DataFrame containing the file's data if successfully read.
None: If an error occurs during the file reading process.
Example:
# Create a Spark session
spark = SparkSession.builder.appName("ReadExample").getOrCreate()
# Read a Parquet or Delta file
df = read_parquet_or_delta_file(spark, "/path/to/directory")
if df is not None:
df.show()
else:
print("Failed to read the file.")
"""
try:
df = spark_session.read.parquet(directory_path)
print(f"Successfully read Parquet file from '{directory_path}'.")
return df
except Exception as e:
print(f"Error reading file from '{directory_path}': {e}")
return None
def execute_spark_sql_query(
spark_session: SparkSession, query: str
) -> "pyspark.sql.dataframe.DataFrame":
"""
Execute a SQL query and return the results as a DataFrame.
:param spark_session: The SparkSession object.
:param query: The SQL query to execute.
:return: A PySpark DataFrame containing the query results.
"""
return spark_session.sql(query)
CRUD SPARK DATAFRAMES¶
In [6]:
from pyspark.sql.functions import expr, col
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import pandas as pd
from pandas import Timestamp
def is_delta_table(spark_session: SparkSession, delta_table_path: str) -> bool:
"""
Checks if a given path corresponds to a Delta table.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The file path to check.
Returns:
bool: True if the path is a Delta table, False otherwise.
"""
return (
DeltaTable.forPath(spark_session, delta_table_path)
if DeltaTable.isDeltaTable(spark_session, delta_table_path)
else False
)
def restore_delta_lake_to_version(
spark_session: SparkSession, delta_table_path: str, version: int = None, timestamp: str = None
):
"""
Restores a Delta table to a specific version or timestamp.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The Delta table path.
version (int, optional): The version to restore to.
timestamp (str, optional): The timestamp to restore to in ISO format.
Returns:
pandas.DataFrame: The updated Delta table details or history.
"""
try:
delta_table = is_delta_table(spark_session, delta_table_path)
if delta_table:
delta_table.optimize().executeCompaction()
history_df = delta_table.history().toPandas()
available_versions = history_df["version"].tolist()
available_timestamps = history_df["timestamp"].tolist()
if version is not None:
if version not in available_versions:
print(f"Error: Version {version} does not exist.")
return history_df
delta_table.restoreToVersion(version)
print(f"Restored to version {version}.")
return delta_table.detail().toPandas()
if timestamp is not None:
timestamp_obj = Timestamp(timestamp)
if timestamp_obj not in available_timestamps:
print(f"Error: Timestamp {timestamp} does not exist.")
return history_df
delta_table.restoreToTimestamp(timestamp)
print(f"Restored to timestamp {timestamp}.")
return delta_table.detail().toPandas()
return history_df
else:
print(f"{delta_table_path} does not contain a Delta table.")
except Exception as e:
print(f"Error restoring Delta table: {str(e)}")
def write_into_delta_lake(
spark_session: SparkSession, delta_table_path: str, spark_dataframe
):
"""
Writes data into a Delta table, avoiding duplicates by comparing existing columns
and adding new columns if necessary.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The Delta table path.
spark_dataframe (DataFrame): The Spark DataFrame to write.
Returns:
None
"""
try:
# Check if the path corresponds to a Delta table
delta_table = is_delta_table(spark_session, delta_table_path)
if delta_table:
# Optimize the Delta table before processing
delta_table.optimize().executeCompaction()
# Load the existing data from the Delta table
existing_data = spark_session.read.format("delta").load(delta_table_path)
# Identify common columns between the existing data and the new data
common_columns = list(set(existing_data.columns).intersection(set(spark_dataframe.columns)))
# Join the existing data with the new data on the common columns
# We will use 'left_anti' to find new records based on differences
new_rows = spark_dataframe.join(
existing_data, on=common_columns, how="left_anti"
)
# If there are new rows, append them to the table
if not new_rows.isEmpty():
# Write the new rows (with new columns if present)
new_rows.write.option("mergeSchema", "true").mode("append").format("delta").save(delta_table_path)
print("Added new data without duplicates.")
else:
print("No new rows to append.")
else:
# If the Delta table does not exist, create a new table
print(f"{delta_table_path} does not contain a Delta table.")
spark_dataframe.write.format("delta").mode("overwrite").save(delta_table_path)
print("Created Delta table with new data.")
except Exception as e:
print(f"Error writing to Delta table: {str(e)}")
def delete_from_delta_lake(
spark_session: SparkSession, delta_table_path: str, condition: str
):
"""
Deletes rows from a Delta table based on a condition.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The Delta table path.
condition (str): The condition for deletion.
Returns:
pandas.DataFrame: Updated Delta table details.
"""
try:
delta_table = is_delta_table(spark_session, delta_table_path)
if delta_table:
delta_table.optimize().executeCompaction()
preview_df = delta_table.toDF().filter(condition).limit(1).collect()
if not preview_df:
print(f"No records match the condition '{condition}'.")
return delta_table.toDF().filter(condition).toPandas()
delta_table.delete(condition)
print(f"Deleted records with condition '{condition}'.")
return delta_table.detail().toPandas()
else:
print(f"{delta_table_path} does not contain a Delta table.")
except Exception as e:
print(f"Error deleting from Delta table: {str(e)}")
def update_from_delta_lake(
spark_session: SparkSession, delta_table_path: str, condition: str, set_expression: dict
):
"""
Updates rows in a Delta table based on a condition.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The Delta table path.
condition (str): The condition for updating rows.
set_expression (str): The update expression (e.g., "column = value").
Returns:
pandas.DataFrame: Updated Delta table details.
"""
try:
delta_table = is_delta_table(spark_session, delta_table_path)
if delta_table:
delta_table.optimize().executeCompaction()
preview_df = delta_table.toDF().filter(condition).limit(1).collect()
if not preview_df:
print(f"No records match the condition '{condition}'.")
return delta_table.toDF().filter(condition).toPandas()
delta_table.update(condition=expr(condition), set=set_expression)
print(f"Updated records with condition '{condition}'.")
return delta_table.detail().toPandas()
else:
print(f"{delta_table_path} does not contain a Delta table.")
except Exception as e:
print(f"Error updating Delta table: {str(e)}")
def merge_from_delta_lake(
spark_session: SparkSession, delta_table_path: str, sync_data_df, identifier_column: str
):
"""
Merges synchronized data into a Delta table.
This function performs a merge operation to update existing records or insert new ones
from the source DataFrame into the target Delta table. The merge is based on a specified
unique identifier column.
Parameters:
spark_session (SparkSession): The active Spark session.
delta_table_path (str): The file path to the Delta table.
sync_data_df (DataFrame): The source DataFrame containing the synchronized data.
identifier_column (str): The column used as the unique identifier for the merge.
Returns:
None
Example:
>>> merge_from_delta_lake(spark_session, "/path/to/delta", sync_data_df, "id")
Merge completed.
"""
try:
# Check if the path corresponds to a Delta table
delta_table = is_delta_table(spark_session, delta_table_path)
if delta_table:
# Optimize the Delta table before the merge
delta_table.optimize().executeCompaction()
# Perform the merge operation
delta_table.alias("sync").merge(
sync_data_df.alias("source"),
f"sync.{identifier_column} = source.{identifier_column}",
).whenMatchedUpdate(
set={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
).whenNotMatchedInsert(
values={col_name: col(f"source.{col_name}") for col_name in sync_data_df.columns}
).execute()
print("Merge operation completed successfully.")
else:
# Create the Delta table if it does not exist
print(f"The directory '{delta_table_path}' does not contain a Delta table.")
sync_data_df.write.format("delta").mode("overwrite").save(delta_table_path)
print(f"Delta table created at '{delta_table_path}' with synchronized data.")
except Exception as e:
print(f"Error during the merge operation on the Delta table '{delta_table_path}': {str(e)}")
def show_historic_version_from_delta_file(
spark_session, file_path, version=None, operation_filter=None, sort_by=None
):
"""
Show historic changes from a Delta table version, handling column mismatches.
"""
try:
from pyspark.sql.functions import lit
# Retrieve Delta Table and its history
delta_table = DeltaTable.forPath(spark_session, file_path)
history_df = delta_table.history()
if operation_filter:
# Filter history based on operation type
result_df = history_df.filter(
history_df.operation.contains(operation_filter)
).toPandas()
elif version is not None:
if version < 0 or version >= history_df.count():
result_df = history_df.toPandas()
else:
# Get metadata for the requested version
history_row = history_df.filter(history_df.version == version).collect()[0]
operation, timestamp, user = (
history_row.operation,
history_row.timestamp,
history_row.userName or "Unknown",
)
# Load current and previous versions of the Delta table
df_current = (
spark_session.read.format("delta")
.option("versionAsOf", version)
.load(file_path)
)
df_previous = (
spark_session.read.format("delta")
.option("versionAsOf", version - 1)
.load(file_path)
if version > 0
else None
)
# Align columns
if df_previous:
# Get all unique columns from both DataFrames
current_columns = set(df_current.columns)
previous_columns = set(df_previous.columns)
# Add missing columns to each DataFrame with null values
for col in previous_columns - current_columns:
df_current = df_current.withColumn(col, lit(None))
for col in current_columns - previous_columns:
df_previous = df_previous.withColumn(col, lit(None))
# Reorder columns to ensure the same order in both DataFrames
common_columns = sorted(list(current_columns | previous_columns))
df_current = df_current.select(common_columns)
df_previous = df_previous.select(common_columns)
# Handle operations
if operation == "UPDATE":
df_removed = (
df_previous.subtract(df_current).toPandas()
if df_previous
else None
)
df_added = (
df_current.subtract(df_previous).toPandas()
if df_previous
else df_current.toPandas()
)
if df_removed is not None:
df_removed["ChangeType"] = "PRE UPDATE"
if df_added is not None:
df_added["ChangeType"] = "UPDATE"
result_df = pd.concat([df_removed, df_added]).reset_index(drop=True)
elif operation == "DELETE" and df_previous:
result_df = (
df_previous.subtract(df_current)
.toPandas()
.assign(ChangeType="Deleted")
)
else:
result_df = (
df_current.subtract(df_previous).toPandas()
if df_previous
else df_current.toPandas()
)
# Sort results if requested
if sort_by and sort_by in result_df.columns:
result_df = result_df.sort_values(by=sort_by)
else:
print(f"Warning: Column '{sort_by}' not found in the DataFrame.")
else:
# Return full history if no version is specified
result_df = history_df.toPandas()
return result_df
except Exception as e:
print(f"Error: {str(e)}. Could not retrieve version or history from the Delta table.")
return None
def read_delta_table_with_change_data_control(spark_session, delta_table_path, starting_version=0, ending_version=0):
"""
Reads data from a Delta table with change data capture (CDC) enabled.
This function loads data from a Delta table and retrieves the changes between specified versions.
If the provided versions are invalid or if `starting_version` is greater than `ending_version`,
the function adjusts the versions to ensure the correct range is used.
Args:
spark_session (SparkSession): The Spark session used to load the Delta table.
delta_table_path (str): The path to the Delta table.
starting_version (int, optional): The starting version for change data retrieval. Defaults to 0.
ending_version (int, optional): The ending version for change data retrieval. Defaults to 0.
Returns:
pandas.DataFrame: A Pandas DataFrame containing the data from the Delta table.
"""
try:
# Initialize DeltaTable object
delta_table = DeltaTable.forPath(spark_session, delta_table_path)
# Get the Delta table history to check version range
history_df = delta_table.history()
max_version = history_df.select("version").rdd.max()[0]
# Adjust if starting_version is greater than ending_version
if starting_version > ending_version:
starting_version, ending_version = ending_version, starting_version
# Check if the provided versions are valid
if 0 <= starting_version <= max_version and 0 <= ending_version <= max_version:
return (
spark_session.read
.format("delta")
.option("readChangeData", "true")
.option("startingVersion", starting_version)
.option("endingVersion", ending_version)
.load(delta_table_path)
.toPandas()
)
else:
# Default behavior when versions are out of range
return (
spark_session.read
.format("delta")
.option("readChangeData", "true")
.option("startingVersion", 0)
.load(delta_table_path)
.toPandas()
)
except Exception as e:
print(f"Error reading Delta table: {e}")
return None
CRUD VERSION 2¶
In [7]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def insert_into_delta_table(spark_session, database_name, table_name, spark_dataframe):
"""
Inserts records into a Delta table.
Parameters:
spark_session (SparkSession): The active Spark session.
database_name (str): The name of the database where the Delta table exists.
table_name (str): The name of the Delta table where data will be inserted.
df (DataFrame): The Spark DataFrame containing the data to insert.
Returns:
None
"""
# Create the full table name with the database
full_table_name = f"{database_name}.{table_name}"
# Insert the data into the Delta table in append mode
spark_dataframe.write.format("delta").mode("append").saveAsTable(full_table_name)
print(f"Records inserted into Delta table: {full_table_name}")
def delete_from_delta_table(spark_session, database_name, table_name, condition):
"""
Deletes records from a Delta table based on a specified condition.
Parameters:
spark_session (SparkSession): The active Spark session.
database_name (str): The name of the database where the Delta table exists.
table_name (str): The name of the Delta table where data will be deleted.
condition (str): The SQL condition to specify which records to delete.
Returns:
None
"""
from delta.tables import DeltaTable
try:
# Create the full table name with the database
full_table_name = f"{database_name}.{table_name}"
# Load the Delta table
delta_table = DeltaTable.forName(spark_session, full_table_name)
# Perform the delete operation
delta_table.delete(condition)
print(f"Records matching condition '{condition}' deleted from Delta table: {full_table_name}")
except Exception as e:
print(f"Error: Could not delete records from Delta table. {str(e)}")
def update_in_delta_table(spark_session, database_name, table_name, condition, set_dict):
"""
Updates records in a Delta table based on a specified condition and set clause.
Parameters:
spark_session (SparkSession): The active Spark session.
database_name (str): The name of the database where the Delta table exists.
table_name (str): The name of the Delta table where data will be updated.
set_dict (dict): A dictionary where the keys are column names and the values are the new values for those columns.
condition (str): The SQL condition to specify which records to update.
Returns:
None
"""
try:
# Create the full table name with the database
full_table_name = f"{database_name}.{table_name}"
# Load the Delta table
delta_table = DeltaTable.forName(spark_session, full_table_name)
# Convert integer and other literals into Spark SQL expressions using F.lit()
set_expr = {col: F.lit(value) if isinstance(value, (int, float, str)) else value
for col, value in set_dict.items()}
# Perform the update operation
delta_table.update(condition, set_expr)
print(f"Records matching condition '{condition}' updated in Delta table: {full_table_name}")
except Exception as e:
print(f"Error: Could not update records in Delta table. {str(e)}")
FUNCTION TO CREATE JOINS IN SPARK¶
In [8]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def join_spark_dataframes(
left_df: DataFrame,
right_df: DataFrame,
join_type: str,
join_condition: str = None,
left_column: str = None,
right_column: str = None,
return_pandas: bool = False,
execution_plan: bool = False,
):
"""
Realiza diferentes tipos de uniones entre dos DataFrames y permite capturar los nombres de las variables reales.
"""
# Diccionario de alias para cada tipo de unión
join_aliases = {
"inner": ["inner", "inner_join"],
"left": ["left", "left_join"],
"left_anti": ["left_anti", "anti_left", "anti_left_join"],
"right": ["right", "right_join"],
"right_anti": ["right_anti", "anti_right", "anti_right_join"],
"outer": ["outer", "outer_join"],
"outer_anti": ["outer_anti", "anti_outer_join", "outer_join_anti"],
"cross": ["cross", "cross_join"],
"self": ["self", "self_join"],
"except": ["except", "exception","exceptAll", "except All", "except_all", "except all"],
"intersect": ["intersect", "intersection"],
"union": ["union"],
"union_all": ["union_all", "union all"],
}
# Mapeo único de uniones a su tipo principal
join_operations = {
"inner": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "inner"
),
"left": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "left"
),
"left_anti": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "left_anti"
),
"right": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "right"
),
"right_anti": lambda: right_df.join(
left_df, right_df[right_column] == left_df[left_column], "left_anti"
),
"right_anti_v2": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "right"
)
.filter(left_df[left_column].isNull())
.select(*right_df.columns),
"outer": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "outer"
),
"outer_anti": lambda: left_df.join(
right_df, left_df[left_column] == right_df[right_column], "outer"
).filter(
(left_df[left_column].isNotNull() & right_df[right_column].isNull())
| (left_df[left_column].isNull() & right_df[right_column].isNotNull())
),
"cross": lambda: left_df.crossJoin(right_df),
"self": lambda: left_df.alias("table_one").join(
left_df.alias("table_two"),
F.col(f"table_one.{left_column}") == F.col(f"table_two.{right_column}"),
"inner"
),
"intersect": lambda: left_df.intersect(right_df),
"except": lambda: left_df.exceptAll(right_df),
"union": lambda: left_df.union(right_df),
"union_all": lambda: left_df.unionByName(right_df, allowMissingColumns=True), # tiene que ser los mismos nombres de columnas en ambos dataframes, sino toma la de la izquierda
}
# Validar que el tipo de join sea válido (revisando en el mapeo de alias)
requested_join = join_type
join_type = next(
(key for key, aliases in join_aliases.items() if join_type in aliases), None
)
if not join_type:
raise ValueError(f"Tipo de join '{requested_join}' no es válido.")
if join_condition and isinstance(join_condition, str):
# Si join_condition es una cadena, convertirla a expresión de columna de PySpark
join_condition = F.expr(join_condition)
joined_df = left_df.join(right_df, join_condition, join_type)
elif left_column and right_column:
# Ejecutar la operación de unión
joined_df = join_operations[join_type]()
if execution_plan:
print(f"Query ejecutado:")
joined_df.explain(True) # Usar .explain(True) para obtener detalles del plan de ejecución
# Retornar el resultado en Pandas o como DataFrame de Spark
if return_pandas:
return joined_df.toPandas()
else:
return joined_df
CLAUSES FOR SPARK SQL QUERIES ¶
| Operación | SQL Normal (ANSI SQL) | Spark SQL | DataFrame API |
|---|---|---|---|
| SELECT | SELECT columna FROM tabla |
SELECT columna FROM tabla |
df.select("columna") |
| FROM | SELECT * FROM tabla |
SELECT * FROM tabla |
df |
| WHERE | SELECT * FROM tabla WHERE condicion |
SELECT * FROM tabla WHERE condicion |
df.filter("condicion") |
| GROUP BY | SELECT columna, COUNT(*) FROM tabla GROUP BY columna |
SELECT columna, COUNT(*) FROM tabla GROUP BY columna |
df.groupBy("columna").count() |
| HAVING | SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10 |
SELECT columna, COUNT(*) FROM tabla GROUP BY columna HAVING COUNT(*) > 10 |
df.groupBy("columna").count().filter("count > 10") |
| JOIN (INNER) | SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id |
SELECT * FROM tabla1 INNER JOIN tabla2 ON tabla1.id = tabla2.id |
df1.join(df2, df1.id == df2.id) |
| JOIN (LEFT OUTER) | SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id |
SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id |
df1.join(df2, df1.id == df2.id, "left") |
| JOIN (RIGHT OUTER) | SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id |
SELECT * FROM tabla1 RIGHT JOIN tabla2 ON tabla1.id = tabla2.id |
df1.join(df2, df1.id == df2.id, "right") |
| JOIN (FULL OUTER) | SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id |
SELECT * FROM tabla1 FULL OUTER JOIN tabla2 ON tabla1.id = tabla2.id |
df1.join(df2, df1.id == df2.id, "outer") |
| LEFT ANTI JOIN (Equivalente en ANSI SQL) | SELECT * FROM tabla1 LEFT JOIN tabla2 ON tabla1.id = tabla2.id WHERE tabla2.id IS NULL |
SELECT * FROM tabla1 LEFT ANTI JOIN tabla2 ON tabla1.id = tabla2.id |
df1.join(df2, df1.id == df2.id, "left_anti") |
| UNION | SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2 |
SELECT columna FROM tabla1 UNION SELECT columna FROM tabla2 |
df1.union(df2) |
| UNION ALL | SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2 |
SELECT columna FROM tabla1 UNION ALL SELECT columna FROM tabla2 |
df1.unionByName(df2) |
| INTERSECT | SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2 |
SELECT columna FROM tabla1 INTERSECT SELECT columna FROM tabla2 |
df1.intersect(df2) |
| EXCEPT | SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 |
SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 |
df1.exceptAll(df2) |
| EXCEPT | SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 |
SELECT columna FROM tabla1 EXCEPT SELECT columna FROM tabla2 |
df1.select("columna").subtract(df2.select("columna")) |
| AGGREGATE (SUM) | SELECT SUM(columna) FROM tabla |
SELECT SUM(columna) FROM tabla |
df.agg({"columna": "sum"}) |
| AGGREGATE (AVG) | SELECT AVG(columna) FROM tabla |
SELECT AVG(columna) FROM tabla |
df.agg({"columna": "avg"}) |
| AGGREGATE (MAX) | SELECT MAX(columna) FROM tabla |
SELECT MAX(columna) FROM tabla |
df.agg({"columna": "max"}) |
| AGGREGATE (MIN) | SELECT MIN(columna) FROM tabla |
SELECT MIN(columna) FROM tabla |
df.agg({"columna": "min"}) |
| AGGREGATE (COUNT) | SELECT COUNT(*) FROM tabla |
SELECT COUNT(*) FROM tabla |
df.count() |
| DISTINCT | SELECT DISTINCT columna FROM tabla |
SELECT DISTINCT columna FROM tabla |
df.select("columna").distinct() |
| ORDER BY | SELECT * FROM tabla ORDER BY columna ASC |
SELECT * FROM tabla ORDER BY columna ASC |
df.orderBy("columna") |
| LIMIT | SELECT * FROM tabla LIMIT 10 |
SELECT * FROM tabla LIMIT 10 |
df.limit(10) |
| OFFSET | SELECT columna FROM tabla LIMIT 10 OFFSET 5 |
SELECT columna FROM tabla LIMIT 10 OFFSET 5 |
df.offset(5).limit(10) |
| CASE WHEN (IF ELSE) | SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla |
SELECT CASE WHEN columna > 10 THEN 'Alto' ELSE 'Bajo' END FROM tabla |
df.withColumn("nuevo_columna", when(df["columna"] > 10, "Alto").otherwise("Bajo")) |
| IS NULL | SELECT * FROM tabla WHERE columna IS NULL |
SELECT * FROM tabla WHERE columna IS NULL |
df.filter(df["columna"].isNull()) |
| IS NOT NULL | SELECT * FROM tabla WHERE columna IS NOT NULL |
SELECT * FROM tabla WHERE columna IS NOT NULL |
df.filter(df["columna"].isNotNull()) |
| CAST | SELECT CAST(columna AS INT) FROM tabla |
SELECT CAST(columna AS INT) FROM tabla |
df.withColumn("columna", df["columna"].cast("int")) |
DOWNLOAD DATASET Y CREATE PANDAS DATAFRAME¶
In [9]:
import pandas as pd
from pandas_dataset_handler import PandasDatasetHandler
import gc
# URL del archivo Parquet en GitHub
parquet_file = "https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet"
pandas_df = PandasDatasetHandler.load_dataset(parquet_file)
parquet_file_name = "dataset.parquet"
pandas_df.to_parquet(parquet_file_name, index=False)
print(pandas_df.shape)
pandas_df.head()
# Liberar memoria eliminando el DataFrame
del pandas_df
gc.collect() # Forzar recolección de basura
print("Pandas DataFrame eliminado de la memoria.")
File 'https://raw.githubusercontent.com/JorgeCardona/data-collection-json-csv-sql/main/parquet/bulk_data_20.parquet' successfully loaded as parquet. (250000, 19) Pandas DataFrame eliminado de la memoria.
SPARK SESSION ¶
In [10]:
# Example usage
app_name = "Delta Spark"
# Ruta para el directorio del metastore
base_dir = "./warehouse-spark"
warehouse_dir = f"{base_dir}/spark_catalog/database"
metastore_db_path = f"{base_dir}/metastore/metastore_db"
spark_session = create_spark_session(
app_name=app_name, warehouse_dir=warehouse_dir, metastore_db_path=metastore_db_path
)
# Set this configuration before running your queries
spark_session.conf.set(
"spark.sql.debug.maxToStringFields", "1000"
) # Set this to a higher number
# Ajusta el número de particiones dinámicamente en base al número de núcleos disponibles
num_particiones = max(2, spark_session.sparkContext.defaultParallelism) # Al menos 2 particiones, ajustado a los núcleos
spark_session.conf.set("spark.sql.shuffle.partitions", num_particiones) # Ajusta el número de particiones según el entorno
spark_session.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") # enable schema evolution
spark_session.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")#config to enable all new Delta tables with Change Data Feed
Warning: Ignoring non-Spark config property: javax.jdo.option.ConnectionURL
:: loading settings :: url = jar:file:/usr/local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars io.delta#delta-spark_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-55a1bfce-d4ba-4929-affe-45cd009be014;1.0 confs: [default] found io.delta#delta-spark_2.12;3.2.1 in central found io.delta#delta-storage;3.2.1 in central found org.antlr#antlr4-runtime;4.9.3 in central :: resolution report :: resolve 155ms :: artifacts dl 6ms :: modules in use: io.delta#delta-spark_2.12;3.2.1 from central in [default] io.delta#delta-storage;3.2.1 from central in [default] org.antlr#antlr4-runtime;4.9.3 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 3 | 0 | 0 | 0 || 3 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-55a1bfce-d4ba-4929-affe-45cd009be014 confs: [default] 0 artifacts copied, 3 already retrieved (0kB/5ms) 24/12/30 16:10:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SparkSession created with Delta and persistent metastore at: ./warehouse-spark/spark_catalog/database
CREATE SPARK DATAFRAME FROM A CLASSIC PARQUET FILE ¶
In [11]:
spark_dataframe_sample = spark_session.read.parquet(parquet_file_name)
# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sample.cache()
spark_dataframe_sample.show(truncate=False)
+-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+ |email |address |country |sex |age|profession |zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language|hobby |favorite_tv_show |favorite_color|favorite_drink|favorite_music|favorite_technology |favorite_car| +-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+ |kathryn69@example.org |060 Steven Row Apt. 380\nTimothystad, WI 93595 |Guam |Male |38 |Pilot |Aquarius |Sushi |Parkour |Superhero |Snake |Serbian |Cycling |Ozark |Sapphire |Soda |Indie |Containers |Lamborghini | |mallory01@example.org |65236 Mcclure Avenue Apt. 710\nBrendaside, CA 81540 |Zambia |Male |26 |Journalist |Taurus |Cheesecake |Lacrosse |Epic |Seal |Dutch |Fishing |Breaking Bad |Chocolate |Smoothie |R&B |Data Warehousing |Audi | |tristankent@example.com |966 Deborah Drive Suite 791\nNorth Mary, AL 20500 |Korea |Female|69 |Mechanic |Aquarius |Burrito |Golf |Romance |Fox |Romanian |Origami |The Boys |Scarlet |Coffee |Experimental |DevOps |Hyundai | |russelljohn@example.com |745 Fitzgerald Parkway Apt. 122\nSheafort, MH 96479 |Montserrat |Male |79 |Translator |Pisces |Lamb Chops |Figure Skating|Mockumentary |Frog |Lithuanian |Reading |Arrow |Teal |Chai |Blues |Blockchain |BMW | |scarlson@example.com |285 Scott Extensions Suite 651\nSouth Ronald, AS 17445 |Greece |Male |71 |Social Worker |Aries |Shawarma |Swimming |Experimental |Jellyfish |Portuguese |Learning |Fargo |Maroon |Matcha |Opera |Green Computing |Lamborghini | |middletonmargaret@example.net|3256 Jennifer Passage\nWest Denisemouth, NE 15122 |Gambia |Male |88 |Artist |Cancer |Fajitas |Taekwondo |Disaster |Antelope |Arabic |Fitness |The Mandalorian |Indigo |Kombucha |Zouk |Nanotechnology |Land Rover | |twallace@example.net |903 Nicholas Cove Apt. 156\nSouth Alexland, ND 50885 |Jordan |Male |27 |Architect |Gemini |Hot Dog |Soccer |Parody |Chimpanzee |Welsh |Astronomy |House of Cards |Peach |Americano |Latin |Containers |Aston Martin| |martinmark@example.com |12268 Daniel Haven Suite 128\nThomasside, NM 81208 |South Africa |Male |55 |Social Worker |Sagittarius|Ice Cream |Rowing |Silent Film |Octopus |Swahili |Stargazing |Succession |Chocolate |Lemonade |House |Containers |Nissan | |jfreeman@example.org |704 Randall Plains Suite 752\nNorth Donaldfurt, SC 99060|Germany |Female|79 |Teacher |Scorpio |Lasagna |Handball |Musical |Bear |Spanish |Dancing |The Flash |Brown |Black Tea |Lo-fi |Edge AI |Buick | |whitekristen@example.org |46919 Joseph Stravenue\nNorth Michael, IN 92016 |Norfolk Island |Male |65 |Electrician |Aries |Chicken Wings|Rugby |Period Piece |Sheep |Portuguese |Origami |CSI |Chartreuse |Soda |Classical |Autonomous Vehicles |Kia | |nicolerose@example.org |8419 Howard Shoals Apt. 078\nOwensshire, OK 96297 |Italy |Female|48 |NULL |Virgo |Eclairs |Skiing |Independent |Fox |Estonian |Playing Cards |Buffy the Vampire Slayer|Yellow |Soda |Cumbia |Augmented Reality |Mitsubishi | |mary11@example.com |8519 Brian Glen\nNorth Dawn, PR 77205 |Canada |Female|95 |Bartender |Taurus |Pizza |Bowling |Disaster |Penguin |Arabic |Gaming |Westworld |Orange |Green Tea |Bolero |Data Lakes |Mazda | |davidfernandez@example.net |49951 Robert Lock\nNew Amy, IA 33498 |Paraguay |Male |23 |Entrepreneur |Sagittarius|Empanadas |Shooting |Comedy |Chimpanzee |Indonesian |Language Learning|Vikings |Bronze |Tonic Water |Electronic |Data Lakes |SEAT | |sharon37@example.org |PSC 8736, Box 3940\nAPO AE 31112 |Saint Pierre and Miquelon|Male |32 |Designer |Capricorn |Dumplings |Weightlifting |Romantic Comedy |Buffalo |Welsh |Language Learning|Lucifer |Yellow |Sangria |Metal |Brain-Computer Interfaces|Audi | |brandon67@example.com |25491 Fritz Club\nMoonbury, ID 23063 |Grenada |Female|47 |Content Creator|Leo |Seafood |Baseball |Experimental |Jellyfish |Japanese |Kayaking |This Is Us |Fuchsia |Cocktail |Post-Rock |Edge AI |Maserati | |brandon32@example.org |27342 Christine Club\nSouth Cindy, SD 97227 |Bhutan |Male |85 |Athlete |Leo |Fried Rice |Polo |Steampunk |Lizard |Russian |Model Building |Grey's Anatomy |Ivory |Whiskey Sour |Country |5G |Ford | |westthomas@example.org |007 Joseph Lights\nHerbertmouth, KS 92526 |Suriname |Male |28 |Game Developer |Virgo |Spaghetti |Surfing |Noir |Monkey |Polish |Geocaching |Doctor Who |Rose |Margarita |Synthwave |Microservices |Bentley | |felicia31@example.net |46124 Rivas Dale Apt. 053\nMasonborough, OR 28373 |Montserrat |Male |56 |Accountant |Pisces |Fajitas |Snooker |Documentary |Bat |Turkish |Photography |The Office |Amethyst |Wine |World Music |Data Warehousing |Skoda | |ofreeman@example.org |93184 Allen Spurs Apt. 514\nEast Jocelynberg, NJ 55288 |Spain |Male |65 |Actor |Gemini |Eclairs |Archery |Experimental |Cat |Greek |Gardening |NCIS |Plum |Cocktail |Post-Rock |Synthetic Data |Rimac | |cannonrachael@example.net |9786 Heather Forest\nSouth Pedroshire, NE 52237 |Burundi |Female|41 |Content Creator|Leo |Pho |Equestrian |Historical |Turtle |Japanese |Snorkeling |Westworld |Silver |Kombucha |Hip-Hop |Serverless Computing |Mitsubishi | +-----------------------------+--------------------------------------------------------+-------------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+------------------------+--------------+--------------+--------------+-------------------------+------------+ only showing top 20 rows
SPLIT DATAFRAME
In [12]:
# divide el dataframe
num_parts = 2
spark_dataframe_sql, spark_dataframe_delta = split_spark_dataframe(spark_dataframe_sample, num_parts)
# Almacenar en caché el DataFrame para evitar recalcularlo en operaciones subsecuentes, mejorando el rendimiento.
spark_dataframe_sql.cache()
spark_dataframe_delta.cache()
spark_dataframe_sql.show()
spark_dataframe_delta.show()
Successfully created 2 DataFrames.
+--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+ | email| address| country| sex|age| profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language| hobby| favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology|favorite_car| +--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+ |kathryn69@example...|060 Steven Row Ap...| Guam| Male| 38| Pilot| Aquarius| Sushi| Parkour| Superhero| Snake| Serbian| Cycling| Ozark| Sapphire| Soda| Indie| Containers| Lamborghini| |mallory01@example...|65236 Mcclure Ave...| Zambia| Male| 26| Journalist| Taurus| Cheesecake| Lacrosse| Epic| Seal| Dutch| Fishing| Breaking Bad| Chocolate| Smoothie| R&B| Data Warehousing| Audi| |tristankent@examp...|966 Deborah Drive...| Korea|Female| 69| Mechanic| Aquarius| Burrito| Golf| Romance| Fox| Romanian| Origami| The Boys| Scarlet| Coffee| Experimental| DevOps| Hyundai| |russelljohn@examp...|745 Fitzgerald Pa...| Montserrat| Male| 79| Translator| Pisces| Lamb Chops|Figure Skating| Mockumentary| Frog| Lithuanian| Reading| Arrow| Teal| Chai| Blues| Blockchain| BMW| |scarlson@example.com|285 Scott Extensi...| Greece| Male| 71| Social Worker| Aries| Shawarma| Swimming| Experimental| Jellyfish| Portuguese| Learning| Fargo| Maroon| Matcha| Opera| Green Computing| Lamborghini| |middletonmargaret...|3256 Jennifer Pas...| Gambia| Male| 88| Artist| Cancer| Fajitas| Taekwondo| Disaster| Antelope| Arabic| Fitness| The Mandalorian| Indigo| Kombucha| Zouk| Nanotechnology| Land Rover| |twallace@example.net|903 Nicholas Cove...| Jordan| Male| 27| Architect| Gemini| Hot Dog| Soccer| Parody| Chimpanzee| Welsh| Astronomy| House of Cards| Peach| Americano| Latin| Containers|Aston Martin| |martinmark@exampl...|12268 Daniel Have...| South Africa| Male| 55| Social Worker|Sagittarius| Ice Cream| Rowing| Silent Film| Octopus| Swahili| Stargazing| Succession| Chocolate| Lemonade| House| Containers| Nissan| |jfreeman@example.org|704 Randall Plain...| Germany|Female| 79| Teacher| Scorpio| Lasagna| Handball| Musical| Bear| Spanish| Dancing| The Flash| Brown| Black Tea| Lo-fi| Edge AI| Buick| |whitekristen@exam...|46919 Joseph Stra...| Norfolk Island| Male| 65| Electrician| Aries|Chicken Wings| Rugby| Period Piece| Sheep| Portuguese| Origami| CSI| Chartreuse| Soda| Classical| Autonomous Vehicles| Kia| |nicolerose@exampl...|8419 Howard Shoal...| Italy|Female| 48| NULL| Virgo| Eclairs| Skiing| Independent| Fox| Estonian| Playing Cards|Buffy the Vampire...| Yellow| Soda| Cumbia| Augmented Reality| Mitsubishi| | mary11@example.com|8519 Brian Glen\n...| Canada|Female| 95| Bartender| Taurus| Pizza| Bowling| Disaster| Penguin| Arabic| Gaming| Westworld| Orange| Green Tea| Bolero| Data Lakes| Mazda| |davidfernandez@ex...|49951 Robert Lock...| Paraguay| Male| 23| Entrepreneur|Sagittarius| Empanadas| Shooting| Comedy| Chimpanzee| Indonesian|Language Learning| Vikings| Bronze| Tonic Water| Electronic| Data Lakes| SEAT| |sharon37@example.org|PSC 8736, Box 394...|Saint Pierre and ...| Male| 32| Designer| Capricorn| Dumplings| Weightlifting| Romantic Comedy| Buffalo| Welsh|Language Learning| Lucifer| Yellow| Sangria| Metal|Brain-Computer In...| Audi| |brandon67@example...|25491 Fritz Club\...| Grenada|Female| 47|Content Creator| Leo| Seafood| Baseball| Experimental| Jellyfish| Japanese| Kayaking| This Is Us| Fuchsia| Cocktail| Post-Rock| Edge AI| Maserati| |brandon32@example...|27342 Christine C...| Bhutan| Male| 85| Athlete| Leo| Fried Rice| Polo| Steampunk| Lizard| Russian| Model Building| Grey's Anatomy| Ivory| Whiskey Sour| Country| 5G| Ford| |westthomas@exampl...|007 Joseph Lights...| Suriname| Male| 28| Game Developer| Virgo| Spaghetti| Surfing| Noir| Monkey| Polish| Geocaching| Doctor Who| Rose| Margarita| Synthwave| Microservices| Bentley| |felicia31@example...|46124 Rivas Dale ...| Montserrat| Male| 56| Accountant| Pisces| Fajitas| Snooker| Documentary| Bat| Turkish| Photography| The Office| Amethyst| Wine| World Music| Data Warehousing| Skoda| |ofreeman@example.org|93184 Allen Spurs...| Spain| Male| 65| Actor| Gemini| Eclairs| Archery| Experimental| Cat| Greek| Gardening| NCIS| Plum| Cocktail| Post-Rock| Synthetic Data| Rimac| |cannonrachael@exa...|9786 Heather Fore...| Burundi|Female| 41|Content Creator| Leo| Pho| Equestrian| Historical| Turtle| Japanese| Snorkeling| Westworld| Silver| Kombucha| Hip-Hop|Serverless Computing| Mitsubishi| +--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+-----------------+--------------------+--------------+--------------+--------------+--------------------+------------+ only showing top 20 rows +--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+ | email| address| country| sex|age| profession|zodiac_sign|favorite_food|favorite_sport|favorite_movie_genre|favorite_animal|preferred_language| hobby| favorite_tv_show|favorite_color|favorite_drink|favorite_music| favorite_technology| favorite_car| +--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+ | dawn38@example.net|14388 Li Flat\nSo...| Togo|Female| 22| Entrepreneur| Libra| Ramen| Boxing| Courtroom Drama| Donkey| Italian| Scrapbooking| Lucifer| Crimson| NULL| Gospel| Robotics| Opel| |xballard@example.org|20007 Sosa Statio...| Korea| Male| 81| NULL| Gemini| Gnocchi| Tennis| Teen Drama| Rabbit| Tamil| Photography| Lucifer| Gold| Nitro Coffee| Indie| Zero Trust Security| Rolls-Royce| |charles72@example...|8950 Shawn Juncti...| Togo|Female| 68| Firefighter| Pisces| NULL| Paragliding| Magical Realism| Kangaroo| Georgian| Meditation| This Is Us| NULL| Matcha| Reggae| Containers| NULL| | adaniel@example.net|58658 Frazier Jun...| Liechtenstein| Male| 31| Web Developer| Aries| Burrito| Baseball| Spy| Starfish| Slovenian| Scrapbooking| Homeland| Yellow| Daiquiri| Cumbia| Data Warehousing| Isuzu| |hedwards@example.net|7507 Jessica Tunn...| Puerto Rico|Female| 64| Musician| Cancer|Chicken Wings| Archery| Independent| Penguin| Thai| Pottery| The Crown| Scarlet| Cold Brew| Shoegaze| Machine Learning| Mazda| |nicholas88@exampl...|Unit 2882 Box 228...| French Guiana| Male| 29| Researcher| Taurus| Fajitas| Parkour| Fantasy Romance| Koala| Ukrainian| Canoeing| The Witcher| Salmon| Nitro Coffee| Reggae| IoT| Isuzu| |carpentertodd@exa...|17382 Michael Fal...| Guyana|Female| 38| Astronomer| Scorpio| Empanadas| NULL| Animation| Moose| Italian| Volunteering| Mad Men| NULL| Milkshake| NULL| IoT| SsangYong| |nicholasreyes@exa...|5848 Williams Tra...| Puerto Rico| Male| 81| Policeman| Aries| Empanadas| Karate| Epic| Goat| German| Wine Tasting| The Sopranos| Brown| Margarita| Gospel|Serverless Computing| Fiat| |stonemichael@exam...|PSC 1170, Box 451...| Isle of Man|Female| 46| Firefighter| Taurus| Pizza| Speed Skating| Biography| Crocodile| French| Bird Watching| Narcos| White| Hot Chocolate| Soul| Metaverse| Mini| |deborahjacobs@exa...|3899 Li Ranch Sui...| Mozambique| Male| 73| Electrician| Virgo| Tamales| Skiing| Silent Film| Giraffe| Uzbek| Fishing| Black Mirror| Purple| Tonic Water| Afrobeats| Bioinformatics| Aston Martin| |gcummings@example...|31591 Williams La...| Bahamas| Male| 22| Dentist| Libra| Fried Rice| Ice Hockey| Sci-Fi| Parrot| Icelandic| Pottery| Breaking Bad| Brown| Mocha| Samba| DevOps| Maserati| | bruce62@example.net|875 Walker Manors...|Slovakia (Slovak ...| Male| 25| Accountant| Gemini| Potato Salad| Curling| Fantasy| Sheep| Polish| Dancing| Ozark| Fuchsia| Wine| Country| Augmented Reality| Audi| |zhangkaren@exampl...|Unit 7845 Box 742...| Serbia| Male| 47| Athlete| Scorpio| Onion Rings| Volleyball| Magical Realism| Koala| Spanish| Woodworking|How I Met Your Mo...| Scarlet| Water| Country| Synthetic Data| BMW| | uromero@example.net|5462 Jenna Mills\...| Japan|Female| 36| Architect| Taurus| Ice Cream| Snowboarding| Suspense| Cat| Pashto| Origami| Homeland| Lime| Gin and Tonic| Synthwave| Digital Twins| McLaren| |kthompson@example...|03287 Silva Pike\...| Hong Kong|Female| 93| Consultant| Gemini| Soup| Speed Skating| Spy| Lion| Hebrew|Puzzle Solving|Buffy the Vampire...| Orange| Root Beer| Dubstep|Brain-Computer In...| Honda| | uyang@example.org|02975 Travis Isle...|Svalbard & Jan Ma...|Female| 71| Athlete| Pisces| Gnocchi| Soccer| Historical| Shark| Italian| Volunteering| The Office| Yellow| Root Beer| Synthwave| Synthetic Data| Citroën| |danielmichael@exa...|Unit 4088 Box 513...| Nigeria| Male| 70|Chef de Cuisine| Aries| Curry| Wrestling| Superhero| Starfish| Indonesian| Board Games| NCIS| Charcoal| Latte| Ska|Privacy-Preservin...| Ferrari| |nortonchelsea@exa...|6295 Heidi Harbor...| American Samoa|Female| 86| Civil Servant| Aquarius| Potato Salad| Hockey| Satire| Bird| Estonian| Knitting| The Sopranos| Lavender| Wine| Post-Rock| Agile Methodologies| Jeep| | bcortez@example.org|30448 White Ville...| Slovenia|Female| 34| Economist| Leo| Risotto| Lacrosse| Epic| Flamingo| Turkish| Woodworking| Black Mirror| Ivory| Matcha| EDM| Agile Methodologies|Mercedes-Benz| |richardsdrew@exam...|78571 Rebecca Lan...| Yemen| Male| 95| Writer| Libra| Burrito| Rugby| Road Movie| Leopard| Irish| Scuba Diving| The Crown| Amethyst| Milk| Techno| Microservices| Jeep| +--------------------+--------------------+--------------------+------+---+---------------+-----------+-------------+--------------+--------------------+---------------+------------------+--------------+--------------------+--------------+--------------+--------------+--------------------+-------------+ only showing top 20 rows
CREATE DATABASE
In [13]:
# Ejemplo de uso
database_name = "delta_spark_database"
create_database(spark_session, database_name)
print()
list_all_databases_and_tables(spark_session)
Database 'delta_spark_database' created or already exists. The following databases and tables are present in the Spark Catalog.
Out[13]:
{'default': [], 'delta_spark_database': []}
SAVE DELTA TABLE USING SPARK SQL ¶
In [14]:
database_sql = "default"
table_sql = "delta_sql"
dataframe_sql = spark_dataframe_sql
sql_warehouse_dir = warehouse_dir
partition_by = ['zodiac_sign']
create_delta_table_with_spark_dataframe_and_register(
spark_session, database_sql, table_sql, dataframe_sql, sql_warehouse_dir, partition_by
)
24/12/30 16:10:43 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
Temporary files at './warehouse-spark/spark_catalog/database/default/delta_sql' have been deleted. Table 'delta_sql' created and registered at './warehouse-spark/spark_catalog/database/default/delta_sql' in database 'default' with partitioning by ['zodiac_sign'].
SAVE DELTA TABLE FROM A SPARK DATAFRAME ¶
In [15]:
database_delta = "delta_spark_database"
table_delta_1 = "delta_dataframe_1"
partition_by = ['zodiac_sign']
table_delta_2 = "delta_dataframe_2"
dataframe_delta_1 = spark_dataframe_delta
dataframe_delta_2 = spark_dataframe_sql
delta_warehouse_dir = warehouse_dir
create_delta_table_in_database(
spark_session, database_delta, table_delta_1, dataframe_delta_1, delta_warehouse_dir, partition_by
)
create_delta_table_in_database(
spark_session, database_delta, table_delta_2, dataframe_delta_2, delta_warehouse_dir
)
Table 'delta_spark_database.delta_dataframe_1' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_1'. Table 'delta_dataframe_1' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_1' with partitioning by ['zodiac_sign']. Table 'delta_spark_database.delta_dataframe_2' dropped.
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_2'. Table 'delta_dataframe_2' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_2' with partitioning by none.
SAVE DATAFRAME AS DELTA PARQUET ¶
In [16]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_delta"
file_path_delta = f"{base_dir}/spark_files/delta_parquet"
partition_by = ['zodiac_sign']
save_dataframe_as_delta_parquet(spark_dataframe_sample, file_name, file_path_delta, partition_by)
24/12/30 16:10:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/12/30 16:10:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/30 16:10:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/30 16:10:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/30 16:10:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/12/30 16:10:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/12/30 16:10:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/12/30 16:10:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/12/30 16:10:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/delta_parquet/spark_dataframe_delta' with partitioning by ['zodiac_sign'].
SAVE DATAFRAME AS CLASSIC PARQUET (SINGLE FILE) ¶
In [17]:
parquet_dataframe = spark_dataframe_sample
file_name_compact = "spark_dataframe_compact"
file_path_compact_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parque_compacto"
save_dataframe_as_parquet(spark_dataframe_sample, file_name_compact, file_path_compact_parquet)
[Stage 81:===========================================> (3 + 1) / 4]
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto/spark_dataframe_compact' with partitioning by None.
SAVE DATAFRAME AS PARTITIONED CLASSIC PARQUET ¶
In [18]:
parquet_dataframe = spark_dataframe_sample
file_name = "spark_dataframe_patitioned"
file_path_partitioned_parquet = f"{base_dir}/spark_files/classic_parquet/parquet_clasico/parquet_particionado"
partition_by = ['zodiac_sign']
save_dataframe_as_parquet(spark_dataframe_sample, file_name, file_path_partitioned_parquet)
[Stage 82:===========================================> (3 + 1) / 4]
DataFrame saved as Delta Parquet at './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado/spark_dataframe_patitioned' with partitioning by None.
REPARTITION vs REPARTITION BY RANGE vs COALASCE ¶
| Aspect | repartition | coalesce | repartitionByRange |
|---|---|---|---|
| Purpose | Used to increase or decrease the number of partitions by reshuffling data across all partitions. Commonly used when data is not evenly distributed or you need more parallelism. | Reduce the number of partitions by merging adjacent partitions, minimizing data shuffle. Ideal for reducing partitions without incurring the cost of full shuffle. | Redistributes data into partitions based on sorted ranges of specified columns. It can either increase or decrease partitions depending on the number provided. Typically used when you want data ordered or grouped by ranges. |
| Shuffle | Yes, it triggers a full shuffle, meaning data is redistributed across the partitions, which can be computationally expensive. | No shuffle is performed; instead, adjacent partitions are merged, making it more efficient for reducing partitions. | Yes, it triggers a shuffle, but the shuffle is optimized for data to be grouped in ordered ranges, reducing unnecessary shuffling compared to repartition. |
| Computational cost | High, due to the cost of reshuffling the data across partitions. It involves moving data from one partition to another, which can be expensive. | Low, as it avoids shuffling the entire dataset. Instead, it reduces the number of partitions by merging them locally within each partition, without moving data unnecessarily. | Moderate, because although it performs a shuffle, the shuffle is designed to be more efficient for range-based partitioning compared to repartition. |
| Uniform distribution | Yes, repartition generally results in uniform partitioning, as it distributes data evenly across the specified number of partitions. |
No, coalescing does not guarantee uniformity because it only combines adjacent partitions, potentially leaving data unevenly distributed between them. | No, repartitionByRange distributes data based on sorted ranges, which may lead to non-uniform partitions depending on the distribution of data within the range. |
| When to use it | When you need to increase the number of partitions for parallelism or balance the data across partitions (e.g., when you're performing an operation that benefits from more partitions). | When you need to reduce partitions without incurring the cost of a full shuffle. It’s efficient when data is already co-located in the same partition, and you just want to merge them. | When you need to redistribute data based on ranges (e.g., for ordered data, numerical ranges, or categories) and want to control the number of partitions for tasks like sorting or range-based queries. |
In [19]:
dataframe_to_resize = spark_dataframe_sample
REPARTITION¶
In [20]:
# Repartition by a specific number of partitions, in this case 5
df_partitioned = dataframe_to_resize.repartition(5)
# View the resulting partitions
df_partitioned.rdd.glom().map(len).collect()
Out[20]:
[50000, 50000, 50000, 50000, 50000]
REPARTITION BY RANGE¶
In [21]:
from pyspark.sql.functions import col
# Repartition by range using the "age" column
df_partitioned = dataframe_to_resize.repartitionByRange(3, col("age"))
# View the resulting partitions
df_partitioned.rdd.glom().map(len).collect()
Out[21]:
[88335, 85434, 76231]
COALASCE¶
In [22]:
# Coalesce by a specific number of partitions, in this case 3
df_partitioned = dataframe_to_resize.coalesce(3)
# View the resulting partitions
df_partitioned.rdd.glom().map(len).collect()
Out[22]:
[0, 0, 250000]
In [23]:
del dataframe_to_resize
del df_partitioned
gc.collect()
Out[23]:
148
READ PARQUET FILES ¶
DELTA PARQUET¶
In [24]:
delta_parquet = read_parquet_or_delta_file(spark_session, file_path_delta)
display(delta_parquet.toPandas())
del delta_parquet
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/delta_parquet'.
| address | country | sex | age | profession | favorite_food | favorite_sport | favorite_movie_genre | favorite_animal | preferred_language | hobby | favorite_tv_show | favorite_color | favorite_drink | favorite_music | favorite_technology | favorite_car | zodiac_sign | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Out[24]:
20
REGULAR PARQUET¶
COMPACT PARQUET¶
In [25]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_compact_parquet)
display(file_parquet.toPandas())
del file_parquet
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parque_compacto'.
| address | country | sex | age | profession | zodiac_sign | favorite_food | favorite_sport | favorite_movie_genre | favorite_animal | preferred_language | hobby | favorite_tv_show | favorite_color | favorite_drink | favorite_music | favorite_technology | favorite_car | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Out[25]:
29
PARTITIONED PARQUET¶
In [26]:
file_parquet = read_parquet_or_delta_file(spark_session, file_path_partitioned_parquet)
display(file_parquet.toPandas())
del file_parquet
gc.collect()
Successfully read Parquet file from './warehouse-spark/spark_files/classic_parquet/parquet_clasico/parquet_particionado'.
| address | country | sex | age | profession | zodiac_sign | favorite_food | favorite_sport | favorite_movie_genre | favorite_animal | preferred_language | hobby | favorite_tv_show | favorite_color | favorite_drink | favorite_music | favorite_technology | favorite_car | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Out[26]:
29
LIST THE TABLES IN THE DATABASE
In [27]:
list_all_databases_and_tables(spark_session)
The following databases and tables are present in the Spark Catalog. Database: default, Table: delta_sql Database: delta_spark_database, Table: delta_dataframe_1 Database: delta_spark_database, Table: delta_dataframe_2
Out[27]:
{'default': ['delta_sql'],
'delta_spark_database': ['delta_dataframe_1', 'delta_dataframe_2']}
In [ ]:
In [ ]:
QUERIES¶
SELECT, WHERE-FILTER, LIMIT
Create Queries Using Spark SQL¶
In [28]:
database = "default"
table = "delta_sql"
query = f"""
SELECT email, hobby, country
FROM {database}.{table}
WHERE AGE <= 18
LIMIT 5
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.show(truncate=False)
spark_dataframe.toPandas()
+---------------------------+---------+--------------+ |email |hobby |country | +---------------------------+---------+--------------+ |esparzalouis@example.net |Traveling|Timor-Leste | |melinda87@example.net |Gaming |Montserrat | |fnelson@example.net |Knitting |American Samoa| |jenniferjohnson@example.net|Chess |New Zealand | |vmiller@example.net |Antiquing|Egypt | +---------------------------+---------+--------------+
Out[28]:
| hobby | country | |
|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
WHERE vs FILTER
from pyspark.sql.functions import col VS import pyspark.sql.functions as F
Create Queries Using Spark DataFrame¶
In [29]:
from pyspark.sql.functions import col
import pyspark.sql.functions as F
# Aplicar el filtro directamente sobre el DataFrame
filtered_dataframe = (
spark_dataframe_sql.select("email", "hobby", "country").filter(col("AGE") <= 18).limit(5)
)
# Mostrar los resultados
filtered_dataframe.toPandas()
Out[29]:
| hobby | country | |
|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [30]:
# Apply the Filter Directly on the DataFrame
filtered_dataframe = (
spark_dataframe_sql.select("email", "hobby", "country").where(F.col("AGE") <= 18).limit(5)
)
# Display the Results
filtered_dataframe.toPandas()
Out[30]:
| hobby | country | |
|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CTE, SELECT, WHERE, BETWEEN, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET
In [31]:
database = "delta_spark_database"
table = "delta_dataframe_1"
query = f"""
WITH FOURTY AS (
SELECT *
FROM {database}.{table}
WHERE AGE <> 0
AND AGE BETWEEN 1 AND 99
)
SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[31]:
| age | country | zodiac_sign | TOTAL |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark SQL¶
In [32]:
database = "delta_spark_database"
table = "delta_dataframe_1"
query = f"""
WITH FOURTY AS (
SELECT *
FROM {database}.{table}
WHERE AGE <> 0
AND AGE BETWEEN 1 AND 99
)
SELECT age, country, zodiac_sign, count(*) as TOTAL
FROM FOURTY
GROUP BY age, country, zodiac_sign
HAVING TOTAL > 0
ORDER BY TOTAL DESC, AGE DESC, COUNTRY
LIMIT 13
OFFSET 31
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[32]:
| age | country | zodiac_sign | TOTAL |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Dataframe vs col
Create Queries Using Spark DataFrame¶
In [33]:
# Single Block of Operations
result_df = (
spark_dataframe_delta.filter(
(spark_dataframe_delta["AGE"] > 0) & (spark_dataframe_delta["AGE"].between(1, 99))
) # Filtrar filas donde AGE <> 0 y AGE BETWEEN 1 AND 99
.groupBy("age", "country", "zodiac_sign") # Agrupar por age, country y zodiac_sign
.count() # Contar las filas en cada grupo
.withColumnRenamed("count", "TOTAL") # Renombrar la columna 'count' a 'TOTAL'
.filter("TOTAL > 0") # Filtrar grupos donde TOTAL > 0 (HAVING en SQL)
.orderBy(
["TOTAL", "age", "country"], ascending=[False, False, True]
) # Ordenar por TOTAL y age en orden descendente y por country en orden Ascendente
.offset(31) # Saltar las primeras 31 filas
.limit(13) # Limitar el resultado a 31 filas
)
# Display the Results
result_df.toPandas()
Out[33]:
| age | country | zodiac_sign | TOTAL |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [34]:
# Single Block of Operations
result_df = (
spark_dataframe_delta.filter(
(col("AGE") > 0) & (col("AGE").between(1, 99))
) # Filtrar filas donde AGE <> 0 y AGE BETWEEN 1 AND 99
.groupBy("age", "country", "zodiac_sign") # Agrupar por age, country y zodiac_sign
.count() # Contar las filas en cada grupo
.withColumnRenamed("count", "TOTAL") # Renombrar la columna 'count' a 'TOTAL'
.filter("TOTAL > 0") # Filtrar grupos donde TOTAL > 0 (HAVING en SQL)
.orderBy(
["TOTAL", "age", "country"], ascending=[False, False, True]
) # Ordenar por TOTAL y age en orden descendente y por country en orden Ascendente
.offset(31) # Saltar las primeras 31 filas
.limit(13) # Limitar el resultado a 31 filas
)
# Display the Results
result_df.toPandas()
Out[34]:
| age | country | zodiac_sign | TOTAL |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
AGGREGATE FUNCTIONS
Create Queries Using Spark SQL¶
In [35]:
database = "delta_spark_database"
table = "delta_dataframe_1"
query = f"""
SELECT
SUM(age) AS total_age,
AVG(age) AS average_age,
COUNT(age) AS total_records,
MAX(age) AS max_age,
MIN(age) AS min_age,
APPROX_COUNT_DISTINCT(age) AS distinct_ages,
STDDEV(age) AS stddev_age,
VARIANCE(age) AS variance_age,
SKEWNESS(age) AS skewness_age,
KURTOSIS(age) AS kurtosis_age,
COLLECT_LIST(age) AS age_list,
COLLECT_SET(age) AS unique_ages
FROM {database}.{table}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[35]:
| total_age | average_age | total_records | max_age | min_age | distinct_ages | stddev_age | variance_age | skewness_age | kurtosis_age | age_list | unique_ages |
|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [36]:
from pyspark.sql.functions import (
sum as spark_sum, avg, count, max as spark_max, min as spark_min,
approx_count_distinct, stddev, variance, skewness, kurtosis, collect_list, collect_set
)
spark_dataframe_delta.agg(
spark_sum("age").alias("total_age"),
avg("age").alias("average_age"),
count("age").alias("total_records"),
spark_max("age").alias("max_age"),
spark_min("age").alias("min_age"),
approx_count_distinct("age").alias("distinct_ages"),
stddev("age").alias("stddev_age"),
variance("age").alias("variance_age"),
skewness("age").alias("skewness_age"),
kurtosis("age").alias("kurtosis_age"),
collect_list("age").alias("age_list"),
collect_set("age").alias("unique_ages")
).toPandas()
Out[36]:
| total_age | average_age | total_records | max_age | min_age | distinct_ages | stddev_age | variance_age | skewness_age | kurtosis_age | age_list | unique_ages |
|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CASE WHEN - THEN
Create Queries Using Spark SQL¶
In [37]:
database = "delta_spark_database"
table = "delta_dataframe_1"
query = f"""
SELECT
CASE
WHEN age < 1 THEN 'Newborn'
WHEN age >= 1 AND age < 3 THEN 'Infant'
WHEN age >= 3 AND age < 5 THEN 'Toddler'
WHEN age >= 5 AND age < 7 THEN 'Preschooler'
WHEN age >= 7 AND age < 10 THEN 'Early School Age'
WHEN age >= 10 AND age < 14 THEN 'Pre-Adolescent'
WHEN age >= 14 AND age < 18 THEN 'Teenager'
WHEN age >= 18 AND age < 28 THEN 'Young Adult'
WHEN age >= 28 AND age < 40 THEN 'Adult'
WHEN age >= 40 AND age < 50 THEN 'Midlife Adult'
WHEN age >= 50 AND age < 60 THEN 'Experienced Adult'
WHEN age >= 60 AND age < 70 THEN 'Mature Adult'
ELSE 'Senior'
END AS age_category,
*
FROM {database}.{table}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[37]:
| age_category | address | country | sex | age | profession | zodiac_sign | favorite_food | favorite_sport | favorite_movie_genre | favorite_animal | preferred_language | hobby | favorite_tv_show | favorite_color | favorite_drink | favorite_music | favorite_technology | favorite_car | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [38]:
from pyspark.sql.functions import when
spark_dataframe_delta_case = spark_dataframe_delta.withColumn(
"age_category",
when(spark_dataframe_delta["age"] < 1, "Newborn")
.when((spark_dataframe_delta["age"] >= 1) & (spark_dataframe_delta["age"] < 3), "Infant")
.when((spark_dataframe_delta["age"] >= 3) & (spark_dataframe_delta["age"] < 5), "Toddler")
.when((spark_dataframe_delta["age"] >= 5) & (spark_dataframe_delta["age"] < 7), "Preschooler")
.when((spark_dataframe_delta["age"] >= 7) & (spark_dataframe_delta["age"] < 10), "Early School Age")
.when((spark_dataframe_delta["age"] >= 10) & (spark_dataframe_delta["age"] < 14), "Pre-Adolescent")
.when((spark_dataframe_delta["age"] >= 14) & (spark_dataframe_delta["age"] < 18), "Teenager")
.when((spark_dataframe_delta["age"] >= 18) & (spark_dataframe_delta["age"] < 28), "Young Adult")
.when((spark_dataframe_delta["age"] >= 28) & (spark_dataframe_delta["age"] < 40), "Adult")
.when((spark_dataframe_delta["age"] >= 40) & (spark_dataframe_delta["age"] < 50), "Midlife Adult")
.when((spark_dataframe_delta["age"] >= 50) & (spark_dataframe_delta["age"] < 60), "Experienced Adult")
.when((spark_dataframe_delta["age"] >= 60) & (spark_dataframe_delta["age"] < 70), "Mature Adult")
.otherwise("Senior")
)
# Reorganizar para poner "age_category" en la primera posición
cols = ['age_category'] + [col for col in spark_dataframe_delta_case.columns if col != 'age_category']
spark_dataframe_delta_case.select(cols).toPandas()
Out[38]:
| age_category | address | country | sex | age | profession | zodiac_sign | favorite_food | favorite_sport | favorite_movie_genre | favorite_animal | preferred_language | hobby | favorite_tv_show | favorite_color | favorite_drink | favorite_music | favorite_technology | favorite_car | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
DISTINCT - CAST
Create Queries Using Spark SQL¶
In [39]:
database = "delta_spark_database"
table = "delta_dataframe_1"
query = f"""
SELECT DISTINCT
sex,
country,
FORMAT_NUMBER(CAST(age AS DOUBLE), 2) AS age,
zodiac_sign,
profession
FROM {database}.{table}
ORDER BY sex ASC, country DESC, age ASC, zodiac_sign DESC, profession ASC
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[39]:
| sex | country | age | zodiac_sign | profession |
|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [40]:
from pyspark.sql import functions as F
spark_dataframe_delta.select(
"sex",
"country",
F.format_number(col("age").cast("float"), 2).alias("age"),
"zodiac_sign",
"profession"
) \
.distinct() \
.orderBy(
col("sex").asc(),
col("country").desc(),
col("age").asc(),
col("zodiac_sign").desc(),
col("profession").asc()
).toPandas()
Out[40]:
| sex | country | age | zodiac_sign | profession |
|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CREATE DATASET FOR SHORT EXAMPLES
In [41]:
# Explicit Schema with the Desired Order
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, MapType
from pyspark.sql.functions import col, struct
schema = StructType(
[
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("related_id", IntegerType(), True),
StructField("zodiac_sign", StringType(), True),
StructField("country", StringType(), True),
StructField("values", ArrayType(IntegerType()), True),
StructField("fruits_and_vitamins", MapType(StringType(), StringType()), True)
]
)
list_data_dict = [
{"id": 1, "name": "Nathalie", "age": 1, "related_id": 11, "zodiac_sign": "Capricorn", "country": "Colombia", "values": [1, 2, 3, 4, 5], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Orange": "Vitamin C"}},
{"id": 2, "name": "Cora", "age": 3, "related_id": 5, "zodiac_sign": "Taurus", "country": "USA", "values": [6, 7, 8, 9, 10], "fruits_and_vitamins": {"Mango": "Vitamin A", "Grapes": "Vitamin K", "Papaya": "Vitamin C"}},
{"id": 3, "name": "Gaby", "age": 5, "related_id": 14, "zodiac_sign": "Gemini", "country": "Colombia", "values": [11, 12, 13, 14, 15], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Pineapple": "Vitamin A", "Strawberry": "Vitamin C"}},
{"id": 4, "name": "Muneca", "age": 7, "related_id": 6, "zodiac_sign": "Cancer", "country": "Portugal", "values": [16, 17, 18, 19, 20], "fruits_and_vitamins": {"Peach": "Vitamin A", "Blueberry": "Vitamin C", "Watermelon": "Vitamin A"}},
{"id": 5, "name": "Principe", "age": 9, "related_id": 2, "zodiac_sign": "Leo", "country": "Argentina", "values": [21, 22, 23, 24, 25], "fruits_and_vitamins": {"Avocado": "Vitamin E", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}},
{"id": 6, "name": "Ana", "age": 11, "related_id": 3, "zodiac_sign": "Virgo", "country": "Colombia", "values": [26, 27, 28, 29, 30], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Mango": "Vitamin A", "Banana": "Vitamin B6"}},
{"id": 7, "name": "Cecilia", "age": 13, "related_id": 20, "zodiac_sign": "Libra", "country": "Colombia", "values": [31, 32, 33, 34, 35], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Apple": "Vitamin C", "Orange": "Vitamin C"}},
{"id": 8, "name": "Lucia", "age": 15, "related_id": 13, "zodiac_sign": "Scorpio", "country": "Peru", "values": [36, 37, 38, 39, 40], "fruits_and_vitamins": {"Peach": "Vitamin A", "Mango": "Vitamin A", "Pineapple": "Vitamin C"}},
{"id": 9, "name": "Zeus", "age": 17, "related_id": 7, "zodiac_sign": "Sagittarius", "country": "Mexico", "values": [41, 42, 43, 44, 45], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Orange": "Vitamin C", "Papaya": "Vitamin C"}},
{"id": 10, "name": "Guadalupe", "age": 15, "related_id": 17, "zodiac_sign": "Aries", "country": "Colombia", "values": [46, 47, 48, 49, 50], "fruits_and_vitamins": {"Blueberry": "Vitamin C", "Banana": "Vitamin B6", "Grapes": "Vitamin K"}},
{"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}},
{"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}},
{"id": 11, "name": "Augusto", "age": 17, "related_id": 1, "zodiac_sign": "Aries", "country": "Argentina", "values": [61, 62, 63, 64, 65], "fruits_and_vitamins": {"Peach": "Vitamin A", "Grapefruit": "Vitamin C", "Mango": "Vitamin A"}},
{"id": 12, "name": "Muiscas", "age": 13, "related_id": 16, "zodiac_sign": "Taurus", "country": "Portugal", "values": [66, 67, 68, 69, 70], "fruits_and_vitamins": {"Watermelon": "Vitamin A", "Orange": "Vitamin C", "Papaya": "Vitamin C"}},
{"id": 13, "name": "Jorge", "age": 11, "related_id": 8, "zodiac_sign": "Gemini", "country": "USA", "values": [71, 72, 73, 74, 75], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Apple": "Vitamin C", "Kiwi": "Vitamin C"}},
{"id": 14, "name": "Sandra", "age": 9, "related_id": 4, "zodiac_sign": "Cancer", "country": "Argentina", "values": [76, 77, 78, 79, 80], "fruits_and_vitamins": {"Grapes": "Vitamin K", "Orange": "Vitamin C", "Blueberry": "Vitamin C"}},
{"id": 15, "name": "Carlos", "age": 2, "related_id": 18, "zodiac_sign": "Leo", "country": "Peru", "values": [81, 82, 83, 84, 85], "fruits_and_vitamins": {"Watermelon": "Vitamin C", "Mango": "Vitamin A", "Apple": "Vitamin C"}},
{"id": 16, "name": "Isabel", "age": 4, "related_id": 12, "zodiac_sign": "Virgo", "country": "Mexico", "values": [86, 87, 88, 89, 90], "fruits_and_vitamins": {"Papaya": "Vitamin C", "Kiwi": "Vitamin C", "Pineapple": "Vitamin A"}},
{"id": 17, "name": "Paola", "age": 6, "related_id": 9, "zodiac_sign": "Libra", "country": "Colombia", "values": [91, 92, 93, 94, 95], "fruits_and_vitamins": {"Strawberry": "Vitamin C", "Grapefruit": "Vitamin C", "Banana": "Vitamin B6"}},
{"id": 18, "name": "David", "age": 8, "related_id": 15, "zodiac_sign": "Scorpio", "country": "Mexico", "values": [96, 97, 98, 99, 100], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Apple": "Vitamin C"}},
{"id": 19, "name": "Sara", "age": 10, "related_id": 19, "zodiac_sign": "Sagittarius", "country": "Colombia", "values": [101, 102, 103, 104, 105], "fruits_and_vitamins": {"Pineapple": "Vitamin C", "Orange": "Vitamin C", "Mango": "Vitamin A"}},
{"id": 20, "name": "Claudia", "age": 12, "related_id": 10, "zodiac_sign": "Capricorn", "country": "Mexico", "values": [106, 107, 108, 109, 110], "fruits_and_vitamins": {"Peach": "Vitamin A", "Apple": "Vitamin C", "Grapefruit": "Vitamin C"}},
{"id": 21, "name": "Spark", "age": 3, "related_id": 22, "zodiac_sign": "Aquarius", "country": "Portugal", "values": [51, 52, 53, 54, 55], "fruits_and_vitamins": {"Kiwi": "Vitamin C", "Papaya": "Vitamin C", "Mango": "Vitamin A"}},
{"id": 22, "name": "Delta", "age": 7, "related_id": 21, "zodiac_sign": "Pisces", "country": "Rusia", "values": [56, 57, 58, 59, 60], "fruits_and_vitamins": {"Apple": "Vitamin C", "Banana": "Vitamin B6", "Strawberry": "Vitamin C"}},
]
# Create DataFrame with the Explicit Schema
spark_dataframe_joins_full = spark_session.createDataFrame(list_data_dict, schema)
# Cache the DataFrame to Avoid Recomputing in Subsequent Operations, Improving Performance.
spark_dataframe_joins_full.cache()
# Display the DataFrame with the Correct Order
display(spark_dataframe_joins_full.toPandas())
# Remove the HashMap Column to Avoid Issues with Union, Except, etc.
spark_dataframe_joins = spark_dataframe_joins_full.drop("fruits_and_vitamins")
# Cache the DataFrame to Prevent Recalculation in Subsequent Operations, Enhancing Performance.
spark_dataframe_joins.cache()
# Display the DataFrame with the Correct Order
display(spark_dataframe_joins.toPandas())
Decompose List and HashMap into Multiple Rows and Combine Them into New Rows¶
In [42]:
from pyspark.sql.functions import lit, explode
from pyspark.sql.window import Window
spark_dataframe_explode = spark_dataframe_joins_full \
.withColumn("row_number", F.row_number().over(Window.partitionBy(col("country")).orderBy(lit(1)))) \
.withColumn("value", explode(col("values"))) \
.withColumn("fruit_vitamin", explode(F.map_entries(col("fruits_and_vitamins")))) \
.select(
"*",
col("fruit_vitamin.key").alias("fruit"),
col("fruit_vitamin.value").alias("vitamin")
) \
.drop("fruit_vitamin")
# Display the Spark DataFrame in Pandas Version
spark_dataframe_explode.toPandas()
Out[42]:
| id | name | age | related_id | zodiac_sign | country | values | fruits_and_vitamins | row_number | value | fruit | vitamin |
|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Multiple Datasets¶
In [43]:
num_parts = 2
spark_dataframe_uno, spark_dataframe_dos = split_spark_dataframe(spark_dataframe_joins, num_parts)
# Cache the DataFrame to prevent recalculation in subsequent operations, improving performance.
spark_dataframe_uno.cache()
spark_dataframe_dos.cache()
display(spark_dataframe_uno.toPandas(), spark_dataframe_dos.toPandas())
CROSS TAB¶
In [44]:
# Perform the crosstab between the 2 columns
df_crosstab = spark_dataframe_joins.crosstab("age", "zodiac_sign")
df_crosstab.toPandas()
Out[44]:
| age_zodiac_sign | Aquarius | Aries | Cancer | Capricorn | Gemini | Leo | Libra | Pisces | Sagittarius | Scorpio | Taurus | Virgo |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Add Total Sum Column¶
In [45]:
from pyspark.sql import functions as F
# Calculate the sum of all columns and add the 'total' column
df_crosstab_with_total = df_crosstab.select(
"*", # Keep all columns from the crosstab
sum(F.coalesce(F.col(col), F.lit(0)) for col in df_crosstab.columns[1:]).alias(
"total"
),
)
# Sort the DataFrame in descending order by the 'total' column
df_crosstab_with_total_sorted = df_crosstab_with_total.orderBy(
["total", "age_zodiac_sign"], ascending=[False, True]
)
# Convert the result to a Pandas DataFrame if necessary
df_crosstab_with_total_sorted.toPandas()
Out[45]:
| age_zodiac_sign | Aquarius | Aries | Cancer | Capricorn | Gemini | Leo | Libra | Pisces | Sagittarius | Scorpio | Taurus | Virgo | total |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create New Tables¶
In [46]:
database_delta = "delta_spark_database"
table_delta_all = "delta_dataframe_all"
table_delta_uno = "delta_dataframe_uno"
table_delta_dos = "delta_dataframe_dos"
dataframe_delta_all = spark_dataframe_joins
dataframe_delta_1 = spark_dataframe_uno
dataframe_delta_2 = spark_dataframe_dos
delta_warehouse_dir = warehouse_dir
partition_by = ['age']
create_delta_table_in_database(
spark_session,
database_delta,
table_delta_all,
dataframe_delta_all,
delta_warehouse_dir,
partition_by
)
create_delta_table_in_database(
spark_session,
database_delta,
table_delta_uno,
dataframe_delta_1,
delta_warehouse_dir,
partition_by
)
create_delta_table_in_database(
spark_session,
database_delta,
table_delta_dos,
dataframe_delta_2,
delta_warehouse_dir,
partition_by
)
list_all_databases_and_tables(spark_session)
Table 'delta_spark_database.delta_dataframe_all' dropped.
24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 84.44% for 9 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 76.00% for 10 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 69.09% for 11 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 63.33% for 12 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 58.46% for 13 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 54.29% for 14 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 50.67% for 15 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 54.29% for 14 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 58.46% for 13 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 63.33% for 12 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 69.09% for 11 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 76.00% for 10 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 84.44% for 9 writers 24/12/30 16:11:38 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory Scaling row group sizes to 95.00% for 8 writers
Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_all'. Table 'delta_dataframe_all' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_all' with partitioning by ['age']. Table 'delta_spark_database.delta_dataframe_uno' dropped. Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_uno'. Table 'delta_dataframe_uno' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_uno' with partitioning by ['age']. Table 'delta_spark_database.delta_dataframe_dos' dropped. Property 'delta.enableChangeDataFeed' enabled for table 'delta_spark_database.delta_dataframe_dos'. Table 'delta_dataframe_dos' created in database 'delta_spark_database' at './warehouse-spark/spark_catalog/database/delta_spark_database/delta_dataframe_dos' with partitioning by ['age']. The following databases and tables are present in the Spark Catalog. Database: default, Table: delta_sql Database: delta_spark_database, Table: delta_dataframe_1 Database: delta_spark_database, Table: delta_dataframe_2 Database: delta_spark_database, Table: delta_dataframe_all Database: delta_spark_database, Table: delta_dataframe_dos Database: delta_spark_database, Table: delta_dataframe_uno
Out[46]:
{'default': ['delta_sql'],
'delta_spark_database': ['delta_dataframe_1',
'delta_dataframe_2',
'delta_dataframe_all',
'delta_dataframe_dos',
'delta_dataframe_uno']}
WINDOW FUNCTIONS
Create Queries Using Spark SQL¶
In [47]:
database = "delta_spark_database"
table = "delta_dataframe_all"
query = f"""
SELECT
country, -- Country of residence
age, -- Age of the user
-- Window functions applied to age and country
ROW_NUMBER() OVER (PARTITION BY country ORDER BY age DESC) AS row_number, -- Assigns a row number based on age within the country
COUNT(*) OVER (PARTITION BY country) AS count_by_country, -- Counts the number of users by country
RANK() OVER (PARTITION BY country ORDER BY age DESC) AS rank_by_country, -- Assigns a rank based on age within each country, considering ties
DENSE_RANK() OVER (PARTITION BY country ORDER BY age DESC) AS dense_rank_by_country, -- Dense rank by country, no gaps in rank in case of ties
MIN(age) OVER (PARTITION BY country) AS min_age_by_country, -- Minimum age within each country
MAX(age) OVER (PARTITION BY country) AS max_age_by_country, -- Maximum age within each country
SUM(age) OVER (PARTITION BY country) AS sum_age_by_country, -- Sum of ages within each country
AVG(age) OVER (PARTITION BY country) AS avg_age_by_country, -- Average age within each country
-- Grouping functions for age
NTILE(3) OVER (PARTITION BY country ORDER BY age DESC) AS ntile_by_country, -- Divides users into quartiles based on age within each country
zodiac_sign, -- Zodiac sign of the user
LAG(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lag_zodiac_sign, -- Zodiac sign of the previous user within each country
LEAD(zodiac_sign, 1) OVER (PARTITION BY country ORDER BY age DESC) AS lead_zodiac_sign, -- Zodiac sign of the next user within each country
FIRST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC) AS first_zodiac_sign, -- First zodiac sign recorded within each country
LAST_VALUE(zodiac_sign) OVER (PARTITION BY country ORDER BY age DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_zodiac_sign -- Last zodiac sign recorded within each country, considering the entire partition
FROM {database}.{table}
ORDER BY country ASC
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[47]:
| country | age | row_number | count_by_country | rank_by_country | dense_rank_by_country | min_age_by_country | max_age_by_country | sum_age_by_country | avg_age_by_country | ntile_by_country | zodiac_sign | lag_zodiac_sign | lead_zodiac_sign | first_zodiac_sign | last_zodiac_sign |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [48]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Apply all operations in a single statement
spark_dataframe_joins.select(
"country",
"age",
F.row_number().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("row_number"),
F.count("*").over(Window.partitionBy("country")).alias("count_by_country"),
F.rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("rank_by_country"),
F.dense_rank().over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("dense_rank_by_country"),
F.min("age").over(Window.partitionBy("country")).alias("min_age_by_country"),
F.max("age").over(Window.partitionBy("country")).alias("max_age_by_country"),
F.sum("age").over(Window.partitionBy("country")).alias("sum_age_by_country"),
F.avg("age").over(Window.partitionBy("country")).alias("avg_age_by_country"),
F.ntile(3).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("ntile_by_country"),
"zodiac_sign",
F.lag("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lag_zodiac_sign"),
F.lead("zodiac_sign", 1).over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("lead_zodiac_sign"),
F.first("zodiac_sign").over(Window.partitionBy("country").orderBy(F.col("age").desc())).alias("first_zodiac_sign"),
F.last("zodiac_sign").over(
Window.partitionBy("country")
.orderBy(F.col("age").desc())
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
).alias("last_zodiac_sign")
).orderBy("country", "row_number").toPandas()
Out[48]:
| country | age | row_number | count_by_country | rank_by_country | dense_rank_by_country | min_age_by_country | max_age_by_country | sum_age_by_country | avg_age_by_country | ntile_by_country | zodiac_sign | lag_zodiac_sign | lead_zodiac_sign | first_zodiac_sign | last_zodiac_sign |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CONCATENATE VALUES AND CREATE ADDITIONAL COLUMNS
Create Queries Using Spark SQL¶
In [49]:
database = "delta_spark_database"
table = "delta_dataframe_all"
query = f"""
WITH NumberedRows AS (
SELECT
ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number, -- Assigns consecutive numbers for each country
(id + age + related_id) AS SUM_NUMBERS, -- Sums the numeric columns row by row
CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS, -- Concatenates the strings with spaces
values
FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[49]:
| row_number | SUM_NUMBERS | CONCATENATE_STRINGS | values |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [50]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Create processed DataFrame
spark_dataframe_joins \
.withColumn("row_number", F.row_number().over(Window.partitionBy(F.col("country")).orderBy(F.lit(1)))) \
.select(
"row_number",
(col("id") + F.col("age") + col("related_id")).alias("SUM_NUMBERS"),
F.concat_ws(" ", col("name"), col("zodiac_sign"), col("country")).alias("CONCATENATE_STRINGS"),
col("values")
) \
.toPandas()
Out[50]:
| row_number | SUM_NUMBERS | CONCATENATE_STRINGS | values |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
SEPARATE THE LIST VALUES FOR EACH INDIVIDUAL RECORD¶
Create Queries Using Spark SQL¶
In [51]:
database = "delta_spark_database"
table = "delta_dataframe_all"
query = f"""
WITH NumberedRows AS (
SELECT
ROW_NUMBER() OVER (PARTITION BY country ORDER BY id) AS row_number, -- Assigns consecutive numbers for each country
(id + age + related_id) AS SUM_NUMBERS, -- Sums the numeric columns row by row
CONCAT(name, ' ', zodiac_sign, ' ', country) AS CONCATENATE_STRINGS, -- Concatenates the strings with spaces
explode(values) AS value -- Expands the array elements
FROM {database}.{table}
)
SELECT * FROM NumberedRows;
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[51]:
| row_number | SUM_NUMBERS | CONCATENATE_STRINGS | value |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [52]:
from pyspark.sql.functions import explode
# Create processed DataFrame
spark_dataframe_joins \
.withColumn("row_number", F.row_number().over(Window.partitionBy(col("country")).orderBy(F.lit(1)))) \
.select(
"row_number", # Row number for each country partition
(col("id") + col("age") + col("related_id")).alias("SUM_NUMBERS"), # Sum of numeric columns for each row
F.concat_ws(" ", col("name"), col("zodiac_sign"), col("country")).alias("CONCATENATE_STRINGS"), # Concatenate strings with space
explode(col("values")).alias("value") # Explode array elements into separate rows
) \
.toPandas() # Convert the result to Pandas DataFrame
Out[52]:
| row_number | SUM_NUMBERS | CONCATENATE_STRINGS | value |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
OPERACIONES DE CONJUNTOS
UNION
Create Queries Using Spark SQL¶
In [53]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1}
UNION
SELECT *
FROM {database}.{table2}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[53]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [54]:
spark_dataframe_uno.union(spark_dataframe_dos).toPandas()
Out[54]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
UNION ALL
Create Queries Using Spark SQL¶
In [55]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1}
UNION ALL
SELECT *
FROM {database}.{table2}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[55]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [56]:
spark_dataframe_uno.unionByName(spark_dataframe_dos).toPandas()
Out[56]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
INTERCEPT
Create Queries Using Spark SQL¶
In [57]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1}
INTERSECT
SELECT *
FROM {database}.{table2}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[57]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [58]:
spark_dataframe_uno.intersect(spark_dataframe_dos).toPandas()
Out[58]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
EXCEPT
Create Queries Using Spark SQL¶
In [59]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1}
EXCEPT
SELECT *
FROM {database}.{table2}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[59]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [60]:
spark_dataframe_uno.exceptAll(spark_dataframe_dos).toPandas()
Out[60]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [61]:
spark_dataframe_uno.subtract(spark_dataframe_dos).toPandas()
Out[61]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
RENAME DATAFRAMES COLUMNS FOR EXAMPLES
SELF JOIN
Create Queries Using Spark SQL¶
In [62]:
database = "delta_spark_database"
table = "delta_dataframe_uno"
query = f"""
SELECT *
FROM {database}.{table} AS TABLE_UNO_1
INNER JOIN {database}.{table} AS TABLE_UNO_2
ON TABLE_UNO_1.id = TABLE_UNO_2.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[62]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [63]:
from pyspark.sql import functions as F
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
# Perform the self join
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"inner"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to Pandas DataFrame
Out[63]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
INNER JOIN
Create Queries Using Spark SQL¶
In [64]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
INNER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[64]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [65]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
spark_dataframe_uno.alias("spark_dataframe_uno").join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"inner"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to Pandas DataFrame
Out[65]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
LEFT JOIN
Create Queries Using Spark SQL¶
In [66]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[66]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [67]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"left"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to a Pandas DataFrame
Out[67]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
ANTI LEFT JOIN
Create Queries Using Spark SQL¶
In [68]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
LEFT ANTI JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[68]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [69]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT TABLE_UNO.*
FROM {database}.{table1} AS TABLE_UNO
LEFT OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[69]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [70]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"left_anti"
) \
.toPandas()
Out[70]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
RIGHT JOIN
Create Queries Using Spark SQL¶
In [71]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[71]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [72]:
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"right"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to a Pandas DataFrame
Out[72]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
ANTI RIGHT JOIN
Create Queries Using Spark SQL¶
In [73]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT TABLE_DOS.*
FROM {database}.{table1} AS TABLE_UNO
RIGHT OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[73]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [74]:
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"right"
) \
.filter(col("spark_dataframe_uno.id").isNull()) \
.select([F.col(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]) \
.toPandas()
Out[74]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
INVIRTIENDO LAS TABLAS Y USANDO EL ANTILEFT JOIN¶
In [75]:
from pyspark.sql.functions import col
spark_dataframe_dos.alias("spark_dataframe_dos") \
.join(
spark_dataframe_uno.alias("spark_dataframe_uno"),
col("spark_dataframe_dos.id") == col("spark_dataframe_uno.related_id"),
"left_anti"
) \
.toPandas()
Out[75]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
OUTER JOIN
Create Queries Using Spark SQL¶
In [76]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[76]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [77]:
from pyspark.sql.functions import col
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
# Perform the join with aliases and select all columns automatically
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"),
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"),
"outer"
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to a Pandas DataFrame
Out[77]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
ANTI OUTER JOIN
Create Queries Using Spark SQL¶
In [78]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT
TABLE_UNO.*,
TABLE_DOS.*
FROM {database}.{table1} AS TABLE_UNO
FULL OUTER JOIN {database}.{table2} AS TABLE_DOS
ON TABLE_UNO.id = TABLE_DOS.related_id
WHERE TABLE_UNO.id IS NULL
OR TABLE_DOS.related_id IS NULL
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[78]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [79]:
from pyspark.sql.functions import col
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
spark_dataframe_uno.alias("spark_dataframe_uno") \
.join(
spark_dataframe_dos.alias("spark_dataframe_dos"), # Alias for spark_dataframe_dos
col("spark_dataframe_uno.id") == col("spark_dataframe_dos.related_id"), # Join condition
"outer" # Join type: OUTER JOIN
) \
.filter(
(
(col("spark_dataframe_uno.id").isNotNull() & col("spark_dataframe_dos.related_id").isNull()) # Condition when spark_dataframe_uno.id is not null and spark_dataframe_dos.related_id is null
| (col("spark_dataframe_uno.id").isNull() & col("spark_dataframe_dos.related_id").isNotNull()) # Condition when spark_dataframe_uno.id is null and spark_dataframe_dos.related_id is not null
)
) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas()
Out[79]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CROSS JOIN
Create Queries Using Spark SQL¶
In [80]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
# RESTORE TABLE {database}.{table1} TO VERSION AS OF {version};
query = f"""
SELECT * FROM {database}.{table1}
CROSS JOIN
{database}.{table2}
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[80]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [81]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
table2 = "delta_dataframe_dos"
query = f"""
SELECT *
FROM {database}.{table1} AS TABLE_UNO
CROSS JOIN {database}.{table2} AS TABLE_DOS
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[81]:
| id | name | age | related_id | zodiac_sign | country | values | id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Create Queries Using Spark DataFrame¶
In [82]:
from pyspark.sql import functions as F
# Get all column names from both DataFrames
spark_dataframe_uno_columns = [F.col(f"spark_dataframe_uno.{col}").alias(f"spark_dataframe_uno.{col}") for col in spark_dataframe_uno.columns]
spark_dataframe_dos_columns = [F.col(f"spark_dataframe_dos.{col}").alias(f"spark_dataframe_dos.{col}") for col in spark_dataframe_dos.columns]
# Perform the CROSS JOIN with aliases and automatically select all columns
spark_dataframe_uno.alias("spark_dataframe_uno") \
.crossJoin(spark_dataframe_dos.alias("spark_dataframe_dos")) \
.select(*spark_dataframe_uno_columns, *spark_dataframe_dos_columns) \
.toPandas() # Convert the result to a Pandas DataFrame
Out[82]:
| spark_dataframe_uno.id | spark_dataframe_uno.name | spark_dataframe_uno.age | spark_dataframe_uno.related_id | spark_dataframe_uno.zodiac_sign | spark_dataframe_uno.country | spark_dataframe_uno.values | spark_dataframe_dos.id | spark_dataframe_dos.name | spark_dataframe_dos.age | spark_dataframe_dos.related_id | spark_dataframe_dos.zodiac_sign | spark_dataframe_dos.country | spark_dataframe_dos.values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
UDFs (User Defined Functions)
CREATE A PYTHON FUNCION AND REGISTER LIKE UDF FUNCTION¶
| Section | Content |
|---|---|
| What are UDFs? | UDFs (User Defined Functions) are custom functions created by developers to perform specific operations in distributed environments like Spark, extending its capabilities when native functions are not sufficient. |
| Advantages | - Flexibility: Enables custom logic. - Compatibility: Integrates seamlessly with DataFrames and Datasets. - Reusability: Can be reused across multiple projects. |
| Disadvantages | - Performance: Slower due to serialization and lack of optimization. - Debugging: Harder to maintain and troubleshoot. |
| Types of UDFs | 1. Scala UDFs: More efficient, executed directly in the JVM. 2. Python UDFs: Less efficient, suitable for complex logic. 3. Pandas UDFs: Operate on vectors, better for intensive computations. |
| Best Practices | 1. Avoid UDFs when possible: Prefer native functions. 2. Use Pandas UDFs if necessary: For vectorized processing in PySpark. 3. Define data types: Ensure clarity and prevent errors. 4. Minimize logic in UDFs: Keep it simple. 5. Testing and monitoring: Validate with real data. |
| Alternatives to UDFs | 1. Native Spark functions: Use methods like filter, groupBy, and agg. 2. Column Expressions: Perform transformations using expressions like col and expr. |
| Conclusion | UDFs are useful for customizing logic in Spark but should be used sparingly to prioritize performance and scalability. Always consider native or more efficient alternatives before resorting to UDFs. |
| Register UDF Function for DataFrames | Step 1: Define a custom function in Python or Scala. Step 2: Register the function as a UDF using udf and specify the return type. Step 3: Apply the UDF to DataFrame columns using methods like withColumn. Python Example: from pyspark.sql.functions import udf from pyspark.sql.types import DoubleType def divide_by_1000(value): return value / 1000 if value is not None else None divide_by_1000_udf = udf(divide_by_1000, DoubleType()) df_with_converted = df.withColumn("converted_value", divide_by_1000_udf(df["value"])) df_with_converted.show() Scala Example: import org.apache.spark.sql.functions.udf val divideBy1000 = (value: java.lang.Double) => { if (value != null) value / 1000 else null } val divideBy1000UDF = udf(divideBy1000) val dfWithConverted = df.withColumn("converted_value", divideBy1000UDF($"value")) dfWithConverted.show() |
SQL UDF FUNCTIONS¶
In [83]:
from pyspark.sql.types import (
DoubleType,
FloatType,
IntegerType,
StringType,
StructField,
StructType,
)
# List all the UDF functions registered in the Spark catalog
def list_sql_udf_functions(spark_session):
# List all registered functions
functions = spark_session.catalog.listFunctions()
# Display the names of the functions that are UDFs
udf_functions = []
for func in functions:
if func.className and "UDFRegistration" in func.className:
udf_functions.append(func.name)
return udf_functions
# Define the function that divides the input value by 1000
def divide_by_1000_spark_sql(value):
return value / 1000 if value is not None else None
# Define the function that multiplies the input value by 10
def multiply_by_10_spark_sql(value):
return value * 10 if value is not None else None
# Define the function that converts text to uppercase
def to_uppercase_spark_sql(value):
return value.upper() if value is not None else None
# Define the function that replaces spaces with "&"
def replace_spaces_with_ampersand_spark_sql(value):
return value.replace(" ", "&") if value is not None else None
# Define the function that replaces lowercase vowels with uppercase vowels
def replace_lowercase_vowels_with_uppercase_spark_sql(value):
if value is None:
return None
# Map lowercase vowels to uppercase in a single pass
vowels = {'a': 'A', 'e': 'E', 'i': 'I', 'o': 'O', 'u': 'U'}
return ''.join(vowels.get(c, c) for c in value)
# Define the function that replaces vowels and some characters with similar-looking numbers
def replace_chars_with_similar_numbers_spark_sql(value):
if value is None:
return None
# Map characters to similar-looking numbers for both uppercase and lowercase
chars_to_numbers = {
'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7', 'b': '8', 'g': '9',
'l': '1', 'z': '2',
'A': '4', 'E': '3', 'I': '1', 'O': '0', 'S': '5', 'T': '7', 'B': '8', 'G': '9',
'L': '1', 'Z': '2'
}
return ''.join(chars_to_numbers.get(c, c) for c in value)
Register the UDF function into the Spark SQL catalog Just for use in SQL queries¶
In [84]:
# Register the function as a UDF in Spark
spark_session.udf.register(name="divide_column_value_by_1000", f=divide_by_1000_spark_sql, returnType=FloatType())
spark_session.udf.register(name="multiply_values_10_times", f=multiply_by_10_spark_sql, returnType=IntegerType())
spark_session.udf.register(name="convert_string_to_uppercase", f=to_uppercase_spark_sql, returnType=StringType())
spark_session.udf.register(name="convert_blank_spaces_with_ampersand", f=replace_spaces_with_ampersand_spark_sql, returnType=StringType())
spark_session.udf.register(name="replace_lowercase_vowels_with_uppercase_vowels", f=replace_lowercase_vowels_with_uppercase_spark_sql, returnType=StringType())
spark_session.udf.register(name="replace_chars_by_numbers", f=replace_chars_with_similar_numbers_spark_sql, returnType=StringType())
Out[84]:
<function __main__.replace_chars_with_similar_numbers_spark_sql(value)>
List all the SQL UDF functions registered in the Spark catalog¶
In [85]:
# list udf functions
udf_functions = list_sql_udf_functions(spark_session)
print(udf_functions)
['convert_blank_spaces_with_ampersand', 'convert_string_to_uppercase', 'divide_column_value_by_1000', 'multiply_values_10_times', 'replace_chars_by_numbers', 'replace_lowercase_vowels_with_uppercase_vowels']
USE UDF SQL FUNCTIONS¶
In [86]:
database = "delta_spark_database"
table1 = "delta_dataframe_uno"
query = f"""
SELECT
id,
convert_string_to_uppercase(name) AS name,
divide_column_value_by_1000(age) AS age,
multiply_values_10_times(related_id) AS related_id,
replace_lowercase_vowels_with_uppercase_vowels(zodiac_sign) AS zodiac_sign,
replace_chars_by_numbers(country) AS country,
values
FROM {database}.{table1} AS TABLE_UNO
"""
spark_dataframe = execute_spark_sql_query(spark_session, query)
spark_dataframe.toPandas()
Out[86]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Register UDF Functions in Spark Session Just for DataFrame Operations Using Spark Decorator¶
EXAMPLES
Python¶
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Define the function
def divide_by_1000_spark_dataframe(value):
return value / 1000 if value is not None else None
# Register the function as a UDF
divide_by_1000_udf = udf(divide_by_1000_spark_dataframe, DoubleType())
# Create a sample DataFrame
data = [(1000,), (2000,), (None,)]
df = spark.createDataFrame(data, ["value"])
# Use the UDF in the DataFrame
df_with_converted = df.withColumn("converted_value", divide_by_1000_udf(df["value"]))
df_with_converted.show()
Scala¶
import org.apache.spark.sql.functions.udf
// Define the function
val divideBy1000 = (value: java.lang.Double) => {
if (value != null) value / 1000 else null
}
// Register the function as a UDF
val divideBy1000UDF = udf(divideBy1000)
// Create a sample DataFrame
val data = Seq((1000.0), (2000.0), (null))
val df = spark.createDataFrame(data.map(Tuple1(_))).toDF("value")
// Use the UDF in the DataFrame
val dfWithConverted = df.withColumn("converted_value", divideBy1000UDF($"value"))
dfWithConverted.show()
In [87]:
from pyspark.sql.functions import udf
# Define the function that divides the input value by 1000
@udf(returnType=FloatType())
def divide_by_1000_spark_dataframe(value):
return value / 1000 if value is not None else None
# Define the function that multiplies the input value by 10
@udf(returnType=IntegerType())
def multiply_by_10_spark_dataframe(value):
return value * 10 if value is not None else None
# Define the function that converts text to uppercase
@udf(returnType=StringType())
def to_uppercase_spark_dataframe(value):
return value.upper() if value is not None else None
# Define the function that replaces spaces with "&"
@udf(returnType=StringType())
def replace_spaces_with_ampersand_spark_dataframe(value):
return value.replace(" ", "&") if value is not None else None
In [88]:
# Define the function that replaces lowercase vowels with uppercase vowels
def replace_lowercase_vowels_with_uppercase_spark_dataframe(value):
if value is None:
return None
# Map lowercase vowels to uppercase in a single pass
vowels = {'a': 'A', 'e': 'E', 'i': 'I', 'o': 'O', 'u': 'U'}
return ''.join(vowels.get(c, c) for c in value)
# Define the function that replaces vowels and some characters with similar-looking numbers
def replace_chars_with_similar_numbers_spark_dataframe(value):
if value is None:
return None
# Map characters to similar-looking numbers for both uppercase and lowercase
chars_to_numbers = {
'a': '4', 'e': '3', 'i': '1', 'o': '0', 's': '5', 't': '7', 'b': '8', 'g': '9',
'l': '1', 'z': '2',
'A': '4', 'E': '3', 'I': '1', 'O': '0', 'S': '5', 'T': '7', 'B': '8', 'G': '9',
'L': '1', 'Z': '2'
}
return ''.join(chars_to_numbers.get(c, c) for c in value)
# Register the function as a UDF
replace_lowercase_vowels_with_uppercase = udf(replace_lowercase_vowels_with_uppercase_spark_dataframe, StringType())
replace_chars_with_similar_numbers = udf(replace_chars_with_similar_numbers_spark_dataframe, StringType())
USE THE UDF DATAFRAME FUNCTIONS¶
In [89]:
# Aplicar las UDFs a las columnas específicas correctamente
spark_dataframe_uno \
.withColumn("age", divide_by_1000_spark_dataframe(col("age"))) \
.withColumn("related_id", multiply_by_10_spark_dataframe(col("related_id"))) \
.withColumn("name", to_uppercase_spark_dataframe(col("name"))) \
.withColumn("zodiac_sign", replace_lowercase_vowels_with_uppercase(col("zodiac_sign"))) \
.withColumn("country", replace_chars_with_similar_numbers(col("country"))) \
.toPandas()
Out[89]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Perform JOINs Using the Function with Parameters
RENAME DATAFRAMES COLUMNS FOR EXAMPLES¶
In [90]:
spark_dataframe_uno_renamed = spark_dataframe_uno.select(
spark_dataframe_uno["id"].alias("spark_dataframe_uno_id"),
spark_dataframe_uno["name"].alias("spark_dataframe_uno_name"),
spark_dataframe_uno["age"].alias("spark_dataframe_uno_age"),
spark_dataframe_uno["related_id"].alias("spark_dataframe_uno_related_id"),
spark_dataframe_uno["zodiac_sign"].alias("spark_dataframe_uno_zodiac_sign"),
spark_dataframe_uno["country"].alias("spark_dataframe_uno_country"),
spark_dataframe_uno["values"].alias("spark_dataframe_uno_values"),
)
spark_dataframe_dos_renamed = spark_dataframe_dos.select(
spark_dataframe_dos["id"].alias("spark_dataframe_dos_id"),
spark_dataframe_dos["name"].alias("spark_dataframe_dos_name"),
spark_dataframe_dos["age"].alias("spark_dataframe_dos_age"),
spark_dataframe_dos["related_id"].alias("spark_dataframe_dos_related_id"),
spark_dataframe_dos["zodiac_sign"].alias("spark_dataframe_dos_zodiac_sign"),
spark_dataframe_dos["country"].alias("spark_dataframe_dos_country"),
spark_dataframe_dos["values"].alias("spark_dataframe_dos_values"),
)
In [91]:
spark_dataframe_uno_renamed.toPandas()
Out[91]:
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [92]:
joins = [
"inner_join",
"left_join",
"anti_left_join",
"right_join",
"anti_right_join",
"outer_join",
"anti_outer_join",
"cross_join",
"intersect",
"except",
"union",
"union_all",
]
for join in joins:
join_results = join_spark_dataframes(
left_df=spark_dataframe_uno_renamed,
right_df=spark_dataframe_dos_renamed,
join_type=join,
left_column="spark_dataframe_uno_id",
right_column="spark_dataframe_dos_related_id",
return_pandas=True,
)
print(f"Join -> {join}")
display(join_results)
print()
print("Join -> self")
display(join_spark_dataframes(
left_df=spark_dataframe_uno_renamed,
right_df=spark_dataframe_dos_renamed,
join_type='self',
left_column="spark_dataframe_uno_id",
right_column="spark_dataframe_uno_related_id",
return_pandas=True,
))
print()
Join -> inner_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> left_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> anti_left_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> right_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> anti_right_join
| spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> outer_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> anti_outer_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> cross_join
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> intersect
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> except
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> union
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> union_all
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_dos_id | spark_dataframe_dos_name | spark_dataframe_dos_age | spark_dataframe_dos_related_id | spark_dataframe_dos_zodiac_sign | spark_dataframe_dos_country | spark_dataframe_dos_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Join -> self
| spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values | spark_dataframe_uno_id | spark_dataframe_uno_name | spark_dataframe_uno_age | spark_dataframe_uno_related_id | spark_dataframe_uno_zodiac_sign | spark_dataframe_uno_country | spark_dataframe_uno_values |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [93]:
list_all_databases_and_tables(spark_session)
The following databases and tables are present in the Spark Catalog. Database: default, Table: delta_sql Database: delta_spark_database, Table: delta_dataframe_1 Database: delta_spark_database, Table: delta_dataframe_2 Database: delta_spark_database, Table: delta_dataframe_all Database: delta_spark_database, Table: delta_dataframe_dos Database: delta_spark_database, Table: delta_dataframe_uno
Out[93]:
{'default': ['delta_sql'],
'delta_spark_database': ['delta_dataframe_1',
'delta_dataframe_2',
'delta_dataframe_all',
'delta_dataframe_dos',
'delta_dataframe_uno']}
CRUD Operations in Spark
In [94]:
file_path_delta_1 = 'warehouse-spark/spark_catalog/database/delta_spark_database.db/delta_dataframe_uno'
INSERT¶
In [95]:
# Call the function to insert data into the Delta table within the 'delta_spark_database' database
insert_into_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", spark_dataframe_dos)
write_into_delta_lake(spark_session, file_path_delta_1, spark_dataframe_dos)
Records inserted into Delta table: delta_spark_database.delta_dataframe_uno No new rows to append.
UPDATE¶
In [96]:
update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno",
"zodiac_sign = 'Capricorn'", {"zodiac_sign": "Capricornio"})
update_in_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno",
"zodiac_sign = 'Cancer'", {"zodiac_sign": "Capricornio"})
update_from_delta_lake(spark_session, file_path_delta_1, "age >= 15", {"age": "age * 1.5", "name": "UPPER(name)"})
Records matching condition 'zodiac_sign = 'Capricorn'' updated in Delta table: delta_spark_database.delta_dataframe_uno Records matching condition 'zodiac_sign = 'Cancer'' updated in Delta table: delta_spark_database.delta_dataframe_uno Updated records with condition 'age >= 15'.
Out[96]:
| format | id | name | description | location | createdAt | lastModified | partitionColumns | clusteringColumns | numFiles | sizeInBytes | properties | minReaderVersion | minWriterVersion | tableFeatures |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
DELETE¶
In [97]:
delete_condition = "id > 20"
# Call the function
delete_from_delta_table(spark_session, "delta_spark_database", "delta_dataframe_uno", delete_condition)
delete_from_delta_lake(spark_session, file_path_delta_1, 'age >= 40')
Records matching condition 'id > 20' deleted from Delta table: delta_spark_database.delta_dataframe_uno No records match the condition 'age >= 40'.
Out[97]:
| id | name | age | related_id | zodiac_sign | country | values |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
RESTORE VERSION¶
In [98]:
restore_delta_lake_to_version(spark_session, file_path_delta_1, 1)
24/12/30 16:12:10 WARN DAGScheduler: Broadcasting large task binary with size 1078.9 KiB
Restored to version 1.
Out[98]:
| format | id | name | description | location | createdAt | lastModified | partitionColumns | clusteringColumns | numFiles | sizeInBytes | properties | minReaderVersion | minWriterVersion | tableFeatures |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
MERGE¶
In [99]:
# Sample data
data0 = [
{
"id": 23, "name": "Sun", "age": 13, "related_id": 7, "zodiac_sign": "Leo", "country": "Brazil", "values": [31, 13, 3, 7, 11]
},
{
"id": 24, "name": "Moon", "age": 31, "related_id": 7, "zodiac_sign": "Capricorn", "country": "Bolivia", "values": [12, 13, 27, 31]
},
{
"id": 25, "name": "Earth", "age": 25, "related_id": 7, "zodiac_sign": "Taurus", "country": "USA", "values": [15, 28, 7]
},
{
"id": 26, "name": "Mars", "age": 35, "related_id": 8, "zodiac_sign": "Aries", "country": "Argentina", "values": [18, 21, 4]
},
]
data1 = [
{
"id": 27, "name": "Venus", "age": 29, "related_id": 5, "zodiac_sign": "Libra", "country": "Argentina", "values": [19, 25, 8, 11]
},
{
"id": 28, "name": "Jupiter", "age": 45, "related_id": 6, "zodiac_sign": "Sagittarius", "country": "Brazil", "values": [23, 30, 12]
},
{
"id": 29, "name": "Saturn", "age": 60, "related_id": 3, "zodiac_sign": "Aquarius", "country": "Peru", "values": [17, 35, 22, 40]
},
{
"id": 30, "name": "Uranus", "age": 28, "related_id": 9, "zodiac_sign": "Scorpio", "country": "Uruguay", "values": [20, 26, 14]
},
{
"id": 31, "name": "Neptune", "age": 50, "related_id": 4, "zodiac_sign": "Pisces", "country": "Boliva", "values": [29, 19, 33]
}
]
data2 = [
{
"id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
},
{
"id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
},
{
"id": 34, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
},
{
"id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
},
{
"id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
}
]
# Define schema for data (this step is optional, but ensures correct data types)
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("related_id", IntegerType(), True),
StructField("zodiac_sign", StringType(), True),
StructField("country", StringType(), True),
StructField("values", ArrayType(IntegerType()), True),
])
# Create a DataFrame from the sample data
for data in [data0, data1, data2]:
new_data_df = spark_session.createDataFrame(data, schema)
display(merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id"))
Merge operation completed successfully.
None
Merge operation completed successfully.
None
Merge operation completed successfully.
None
INSERT DATA AVOING DUPLICATES¶
In [100]:
data2 = [
{
"id": 32, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5]
},
{
"id": 33, "name": "Ceres", "age": 38, "related_id": 10, "zodiac_sign": "Virgo", "country": "Brazil", "values": [21, 18, 6]
},
{
"id": 99, "name": "Eris", "age": 22, "related_id": 11, "zodiac_sign": "Gemini", "country": "Peru", "values": [15, 25, 12]
},
{
"id": 35, "name": "Haumea", "age": 19, "related_id": 1, "zodiac_sign": "Aries", "country": "Mexico", "values": [10, 14, 7, 9]
},
{
"id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11]
}
]
for data in [data0, data1, data2]:
new_data_df = spark_session.createDataFrame(data, schema)
display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))
No new rows to append.
None
No new rows to append.
None
Added new data without duplicates.
None
EVOLUTION SCHEMA¶
In [101]:
evolution_schema = [
{
"id": 36, "name": "Makemake", "age": 27, "related_id": 12, "zodiac_sign": "Taurus", "country": "Colombia", "values": [18, 22, 11], "color":"Blue"
},
{
"id": 1227, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Cancer", "country": "Peru", "values": [8, 13, 5], "fruit": "Guama"
}
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("related_id", IntegerType(), True),
StructField("zodiac_sign", StringType(), True),
StructField("country", StringType(), True),
StructField("values", ArrayType(IntegerType()), True),
StructField("color", StringType(), True),
StructField("fruit", StringType(), True),
])
for data in [evolution_schema]:
new_data_df = spark_session.createDataFrame(data, schema)
display(write_into_delta_lake(spark_session, file_path_delta_1, new_data_df))
evolution_schema_2 = [
{
"id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Capricorn", "country": "Peru", "values": [8, 13, 5], "continent": "America"
},
{
"id": 7831, "name": "Pluto", "age": 15, "related_id": 2, "zodiac_sign": "Leo", "country": "Peru", "values": [8, 13, 5], "continent": "America", "sport":"Tennis"
}
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("related_id", IntegerType(), True),
StructField("zodiac_sign", StringType(), True),
StructField("country", StringType(), True),
StructField("values", ArrayType(IntegerType()), True),
StructField("continent", StringType(), True),
])
for data in [evolution_schema_2]:
new_data_df = spark_session.createDataFrame(data, schema)
merge_from_delta_lake(spark_session, file_path_delta_1, new_data_df, "id")
Added new data without duplicates.
None
Merge operation completed successfully.
Review the version history to explore Delta Lake's time travel functionality.¶
In [102]:
historic_version = show_historic_version_from_delta_file(spark_session, file_path_delta_1)
historic_version
Out[102]:
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CONTENT CHANGES BY VERSION Method 1 - History Version¶
In [103]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()
for version in range(total_versions + 1):
print(f"Version : {version}, Operation : {total_operations[version]}, Content")
display(show_historic_version_from_delta_file(spark_session, file_path_delta_1, version, None, 'id'))
print()
Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content
| id | name | age | related_id | zodiac_sign | country | values | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : SET TBLPROPERTIES, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : WRITE, Content
| age | country | id | name | related_id | values | zodiac_sign | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : OPTIMIZE, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 4, Operation : UPDATE, Content
| age | country | id | name | related_id | values | zodiac_sign | ChangeType | |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 5, Operation : UPDATE, Content
| age | country | id | name | related_id | values | zodiac_sign | ChangeType | |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 6, Operation : UPDATE, Content
| age | country | id | name | related_id | values | zodiac_sign | ChangeType | |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 7, Operation : DELETE, Content
| age | country | id | name | related_id | values | zodiac_sign | ChangeType | |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 8, Operation : RESTORE, Content
| age | country | id | name | related_id | values | zodiac_sign | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 9, Operation : MERGE, Content
| age | country | id | name | related_id | values | zodiac_sign | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 10, Operation : OPTIMIZE, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 11, Operation : MERGE, Content
| age | country | id | name | related_id | values | zodiac_sign | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 12, Operation : MERGE, Content
| age | country | id | name | related_id | values | zodiac_sign | |
|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 13, Operation : OPTIMIZE, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 14, Operation : WRITE, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 15, Operation : OPTIMIZE, Content
| age | country | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 16, Operation : WRITE, Content
| age | color | country | fruit | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 17, Operation : OPTIMIZE, Content
| age | color | country | fruit | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 18, Operation : MERGE, Content
| age | color | continent | country | fruit | id | name | related_id | values | zodiac_sign |
|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
CONTENT CHANGES BY VERSION Method 2 - Change Data Feed Version Validation and Control¶
In [104]:
total_versions = max(historic_version.version)
total_operations = historic_version.operation.to_list()
total_operations.reverse()
for version in range(total_versions + 1):
print(f"Version : {version}, Operation : {total_operations[version]}, Content")
display(read_delta_table_with_change_data_control(spark_session, file_path_delta_1, version, version))
print()
Version : 0, Operation : CREATE OR REPLACE TABLE AS SELECT, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : SET TBLPROPERTIES, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : WRITE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : OPTIMIZE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 4, Operation : UPDATE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 5, Operation : UPDATE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 6, Operation : UPDATE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 7, Operation : DELETE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 8, Operation : RESTORE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 9, Operation : MERGE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 10, Operation : OPTIMIZE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 11, Operation : MERGE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 12, Operation : MERGE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 13, Operation : OPTIMIZE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 14, Operation : WRITE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 15, Operation : OPTIMIZE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 16, Operation : WRITE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 17, Operation : OPTIMIZE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 18, Operation : MERGE, Content
| id | name | age | related_id | zodiac_sign | country | values | color | fruit | continent | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Review the version history FROM FILE¶
DESCARGAR ARCHIVOS¶
In [105]:
import pandas as pd
import os
import gc
# Path for the directory
file_path = "documents/"
file_text_1 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/IngenieriaDeDatos.txt"
file_text_2 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/DeltaLake.txt"
file_text_3 = "https://raw.githubusercontent.com/JorgeCardona/recursos/refs/heads/main/delta-spark/documents/MLOps.txt"
# Create the directory if it doesn't exist
os.makedirs(file_path, exist_ok=True)
for file_url in [file_text_1, file_text_2, file_text_3]:
# Read the text file from the URL
pandas_df = pd.read_csv(file_url, encoding="utf-8")
# Get the file name
file_name = file_url.split("/")[-1]
print(file_name)
# Path to save the file
save_path = f"{file_path}/{file_name}"
# Save the text file without modifications
pandas_df.to_csv(save_path, index=False, encoding="utf-8")
print(f"File downloaded: {file_name} and saved in the directory -> {save_path}")
# Release memory by deleting the DataFrame
del pandas_df
gc.collect() # Force garbage collection
print("Pandas DataFrame removed from memory.")
IngenieriaDeDatos.txt File downloaded: IngenieriaDeDatos.txt and saved in the directory -> documents//IngenieriaDeDatos.txt DeltaLake.txt File downloaded: DeltaLake.txt and saved in the directory -> documents//DeltaLake.txt MLOps.txt File downloaded: MLOps.txt and saved in the directory -> documents//MLOps.txt Pandas DataFrame removed from memory.
PROCESAR ARCHIVOS¶
In [106]:
# Read the text file as a Spark DataFrame
df_text_1 = spark_session.read.text("documents/IngenieriaDeDatos.txt")
df_text_2 = spark_session.read.text("documents/DeltaLake.txt")
df_text_3 = spark_session.read.text("documents/MLOps.txt")
delta_lake_text_path = "warehouse-spark/spark_files/delta_parquet/texto"
# Save the combined DataFrame to a Delta table
for new_data_df in [df_text_1, df_text_2, df_text_3]:
display(write_into_delta_lake(spark_session, delta_lake_text_path, new_data_df))
# Read the data from the Delta Lake table and convert it to a Pandas DataFrame
spark_session.read.format("delta").load(delta_lake_text_path).toPandas()
warehouse-spark/spark_files/delta_parquet/texto does not contain a Delta table. Created Delta table with new data.
None
Added new data without duplicates.
None
Added new data without duplicates.
None
Out[106]:
| value |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Method 1 - History Version¶
In [107]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_lake_text_path)
historic_version_file
Out[107]:
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [108]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(show_historic_version_from_delta_file(spark_session, delta_lake_text_path, version))
print()
Version : 0, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| value |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| value |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : OPTIMIZE, Content Warning: Column 'None' not found in the DataFrame.
| value |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| value |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Method 2 - Change Data Feed Version Validation and Control¶
In [109]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(read_delta_table_with_change_data_control(spark_session, delta_lake_text_path, version, version))
print()
Version : 0, Operation : WRITE, Content
| value | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : WRITE, Content
| value | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : OPTIMIZE, Content
| value | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : WRITE, Content
| value | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
DELTA STREAMING¶
In [110]:
import pandas as pd
import numpy as np
from time import sleep
import os
def process_and_save_files(
csv_streaming_path,
base_streaming_directory,
metadata_streaming_path,
delta_output_streaming_path,
file_name='flight_logs',
num_files=5,
sleep_time=10,
raw_url_base='https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/',
file_format='csv',
multiline_json=False
):
"""
Processes and saves streaming files in the specified formats (csv, json, or parquet), merging data from two parts
based on the 'id' or 'flight_id' column, and saves the final merged file.
Parameters:
csv_streaming_path (str): Path to save the processed CSV files.
base_streaming_directory (str): Base directory for file storage.
metadata_streaming_path (str): Path for metadata storage.
delta_output_streaming_path (str): Path to save Delta format files.
file_name (str, optional): Base name for the files. Default is 'flight_logs'.
num_files (int, optional): Number of files to process. Default is 5.
sleep_time (int, optional): Time in seconds to wait before processing the next file. Default is 10 seconds.
raw_url_base (str, optional): Base URL for downloading files. Default is a GitHub URL.
file_format (str, optional): File format to process ('csv', 'json', 'parquet'). Default is 'csv'.
multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.
Raises:
ValueError: If the file format is unsupported.
"""
# Create directories if they do not exist
os.makedirs(csv_streaming_path, exist_ok=True)
os.makedirs(metadata_streaming_path, exist_ok=True)
os.makedirs(delta_output_streaming_path, exist_ok=True)
# Process files
for index in range(1, num_files + 1):
final_file = f'{csv_streaming_path}/{file_name}_{index}.{file_format}'
# Download and read the first part of the file in the appropriate format
file_1 = f'{file_name}_part_1_{index}.{file_format}'
base_url1 = f'{raw_url_base}{file_1}'
if file_format == 'csv':
df1 = pd.read_csv(base_url1)
elif file_format == 'json':
if multiline_json:
df1 = pd.read_json(base_url1, lines=True) # For multiline JSON
else:
df1 = pd.read_json(base_url1) # For JSON in list
elif file_format == 'parquet':
df1 = pd.read_parquet(base_url1)
else:
raise ValueError(f"Unsupported file format {file_format}")
# Download and read the second part of the file in the appropriate format
file_2 = f'{file_name}_part_2_{index}.{file_format}'
base_url2 = f'{raw_url_base}{file_2}'
if file_format == 'csv':
df2 = pd.read_csv(base_url2)
elif file_format == 'json':
if multiline_json:
df2 = pd.read_json(base_url2, lines=True) # For multiline JSON
else:
df2 = pd.read_json(base_url2) # For JSON in list
elif file_format == 'parquet':
df2 = pd.read_parquet(base_url2)
else:
raise ValueError(f"Unsupported file format {file_format}")
# Merge the data on the 'id' or 'flight_id' column
df3 = pd.merge(df1, df2, left_on='id', right_on='flight_id', how='inner')
# Save the final file in the appropriate format
if file_format == 'csv':
df3.to_csv(f'{final_file}', index=False)
elif file_format == 'json':
df3.to_json(f'{final_file}', orient='records', lines=True)
elif file_format == 'parquet':
df3.to_parquet(f'{final_file}')
else:
raise ValueError(f"Unsupported file format {file_format}")
print(f'{final_file} saved Successfully!!')
# Pause for the specified time
sleep(sleep_time)
DEFINE THE STRUCTURE OF THE DATA TO BE READ, TO ASSIGN THE APPROPRIATE SCHEMA¶
In [111]:
import shutil
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, lit
from pyspark.sql.types import (
DoubleType,
FloatType,
IntegerType,
StringType,
StructField,
StructType,
)
# Define the schema
customSchema = StructType(
[
StructField("id", IntegerType(), True),
StructField("secure_code", StringType(), True),
StructField("airline", StringType(), True),
StructField("departure_city", StringType(), True),
StructField("departure_date", StringType(), True),
StructField("arrival_airport", StringType(), True),
StructField("arrival_city", StringType(), True),
StructField("arrival_time", StringType(), True),
StructField("passenger_name", StringType(), True),
StructField("passenger_gender", StringType(), True),
StructField("seat_number", StringType(), True),
StructField("currency", StringType(), True),
StructField("departure_gate", StringType(), True),
StructField("flight_status", StringType(), True),
StructField("co_pilot_name", StringType(), True),
StructField("aircraft_type", StringType(), True),
StructField("fuel_consumption", DoubleType(), True),
StructField("flight_id", IntegerType(), True),
StructField("flight_number", IntegerType(), True),
StructField("departure_airport", StringType(), True),
StructField("departure_country", StringType(), True),
StructField("departure_time", StringType(), True),
StructField("arrival_country", StringType(), True),
StructField("arrival_date", StringType(), True),
StructField("flight_duration", DoubleType(), True),
StructField("passenger_age", IntegerType(), True),
StructField("passenger_nationality", StringType(), True),
StructField("ticket_price", DoubleType(), True),
StructField("baggage_weight", DoubleType(), True),
StructField("arrival_gate", StringType(), True),
StructField("pilot_name", StringType(), True),
StructField("cabin_crew_count", IntegerType(), True),
StructField("aircraft_registration", StringType(), True),
StructField("flight_distance", DoubleType(), True),
]
)
EXECUTE THE DELTA STREAMING VERSION¶
In [112]:
import time
from datetime import datetime
from pyspark.sql.functions import upper
def process_streaming_files(
spark_session,
input_path,
output_path,
checkpoint_path,
file_format="csv",
custom_schema=None,
header=True,
output_mode="append",
idle_timeout=20,
multiline_json=False, # For multiline JSON
capitalized_column="departure_city", # Parameter for the column to capitalize
replace_chars_with_numbers_column="airline" # Parameter for the column to replace characters by numbers
):
"""
Processes streaming files from the specified input path, handles different file formats (csv, json, parquet),
and writes the processed data to an output path in Delta format. It also handles checkpoints, manages idle timeout,
and performs data transformations such as capitalizing specified columns and replacing characters with similar numbers.
Parameters:
spark_session (SparkSession): The active Spark session used to run the streaming job.
input_path (str): The path where the streaming files are located.
output_path (str): The path where the processed Delta files will be written.
checkpoint_path (str): The path to store the checkpoint data.
file_format (str, optional): The format of the input files ('csv', 'json', or 'parquet'). Default is 'csv'.
custom_schema (StructType, optional): The schema to apply to the incoming data. Default is None.
header (bool, optional): Whether to include the header in CSV files. Default is True.
output_mode (str, optional): The output mode for the streaming job. Default is 'append'.
idle_timeout (int, optional): The timeout in seconds to stop the stream if no new data is processed. Default is 20 seconds.
multiline_json (bool, optional): Whether to handle JSON files as multiline. Default is False.
capitalized_column (str, optional): The name of the column whose values should be capitalized. Default is 'departure_city'.
replace_chars_with_numbers (str, optional): The name of the column whose characters should be replaced with similar-looking numbers. Default is 'airline'.
Raises:
ValueError: If the file format is unsupported.
"""
# Set up the stream reader depending on the file format
stream_reader = spark_session.readStream.format(file_format)
if file_format == "csv":
stream_reader = stream_reader.option("header", str(header).lower())
elif file_format == "json":
if multiline_json:
stream_reader = stream_reader.option("multiline", "true") # For multiline JSON
else:
stream_reader = stream_reader.option("multiline", "false") # For JSON by line
elif file_format == "parquet":
# No additional options needed for Parquet
pass
else:
raise ValueError(f"Unsupported file format {file_format}")
# Apply custom schema if provided
if custom_schema:
stream_reader = stream_reader.schema(custom_schema)
# Read the stream from the input path
input_stream = stream_reader.load(input_path)
# DATA TRANSFORMATIONs
# Capitalize the specified column if it exists in the DataFrame
if capitalized_column in input_stream.columns:
input_stream = input_stream.withColumn(
capitalized_column,
to_uppercase_spark_dataframe(input_stream[capitalized_column])
)
# Replace characters with similar numbers for another specified column if it exists in the DataFrame
if replace_chars_with_numbers_column in input_stream.columns:
input_stream = input_stream.withColumn(
replace_chars_with_numbers_column,
replace_chars_with_similar_numbers(input_stream[replace_chars_with_numbers_column])
)
# Write the stream to Delta format with checkpointing
output_stream = input_stream.writeStream \
.format("delta") \
.outputMode(output_mode) \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime="10 seconds") # Process every 10 seconds
# Start the streaming query
query = output_stream.start(output_path)
last_processed_time = time.time() # Initialize the last processed time
while query.isActive:
# Get the last progress of the stream
last_progress = query.lastProgress
if last_progress:
processing_time = last_progress.get("processedRowsPerSecond", 0)
print(f"Last progress: {last_progress}, Processed per second: {processing_time}")
if processing_time == 0:
# If no new rows have been processed, check the idle time
time_since_last_processed = time.time() - last_processed_time
print(f"Time since last data processed: {time_since_last_processed} seconds.")
if time_since_last_processed > idle_timeout:
print(f"No new data for {idle_timeout} seconds. Stopping the stream.")
query.stop()
break
else:
last_processed_time = time.time() # Update the time when data is processed
# Sleep briefly before checking again
time.sleep(5)
print("Stream has stopped.")
DATA PROCESSING IN STREAMING WITH THREADS
In [113]:
import threading
# Function to execute 'process_and_save_files'
def run_process_and_save_files():
"""
Runs the process_and_save_files function to download, process, and save files in the specified directories.
"""
process_and_save_files(
csv_streaming_path="csv_streaming_files/",
base_streaming_directory="warehouse-spark/delta_spark_streaming",
metadata_streaming_path="warehouse-spark/delta_spark_streaming/metadata",
delta_output_streaming_path="warehouse-spark/delta_spark_streaming/proccesed",
file_name="flight_logs",
num_files=5,
sleep_time=10,
raw_url_base="https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/",
)
# Function to execute 'process_streaming_files'
def run_process_streaming_files():
"""
Runs the process_streaming_files function to process streaming data and write it to a Delta table.
"""
process_streaming_files(
spark_session=spark_session,
input_path=csv_streaming_path,
output_path=delta_output_streaming_path,
checkpoint_path=metadata_streaming_path,
file_format="csv",
custom_schema=customSchema,
header=True,
output_mode="append",
idle_timeout=20,
capitalized_column="departure_city",
replace_chars_with_numbers_column="airline",
)
# Define directories for processing
csv_streaming_path = "csv_streaming_files/"
base_streaming_directory = "warehouse-spark/delta_spark_streaming"
metadata_streaming_path = f"{base_streaming_directory}/metadata"
delta_output_streaming_path = f"{base_streaming_directory}/proccesed"
# Create threads to run both functions in parallel
process_and_save_files_thread = threading.Thread(target=run_process_and_save_files)
process_streaming_files_thread = threading.Thread(target=run_process_streaming_files)
# Start both threads in parallel
process_and_save_files_thread.start()
process_streaming_files_thread.start()
# Wait for both threads to finish before continuing with the next cell
process_and_save_files_thread.join()
process_streaming_files_thread.join()
print("Both processes have finished.")
24/12/30 16:13:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
csv_streaming_files//flight_logs_1.csv saved Successfully!!
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:30.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485, 'durationMs': {'addBatch': 1147, 'commitOffsets': 18, 'getBatch': 19, 'latestOffset': 20, 'queryPlanning': 31, 'triggerExecution': 1267, 'walCommit': 24}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 789.2659826361485
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:30.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485, 'durationMs': {'addBatch': 1147, 'commitOffsets': 18, 'getBatch': 19, 'latestOffset': 20, 'queryPlanning': 31, 'triggerExecution': 1267, 'walCommit': 24}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 789.2659826361485
csv_streaming_files//flight_logs_2.csv saved Successfully!!
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:30.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485, 'durationMs': {'addBatch': 1147, 'commitOffsets': 18, 'getBatch': 19, 'latestOffset': 20, 'queryPlanning': 31, 'triggerExecution': 1267, 'walCommit': 24}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 789.2659826361485
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:30.000Z', 'batchId': 0, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485, 'durationMs': {'addBatch': 1147, 'commitOffsets': 18, 'getBatch': 19, 'latestOffset': 20, 'queryPlanning': 31, 'triggerExecution': 1267, 'walCommit': 24}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': None, 'endOffset': {'logOffset': 0}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 1060.4453870625664, 'processedRowsPerSecond': 789.2659826361485}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 789.2659826361485
csv_streaming_files//flight_logs_3.csv saved Successfully!!
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:50.000Z', 'batchId': 1, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 2288.329519450801, 'durationMs': {'addBatch': 373, 'commitOffsets': 17, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 6, 'triggerExecution': 437, 'walCommit': 17}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 0}, 'endOffset': {'logOffset': 1}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 2288.329519450801}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 2288.329519450801
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:13:50.000Z', 'batchId': 1, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 2288.329519450801, 'durationMs': {'addBatch': 373, 'commitOffsets': 17, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 6, 'triggerExecution': 437, 'walCommit': 17}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 0}, 'endOffset': {'logOffset': 1}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0100010001, 'processedRowsPerSecond': 2288.329519450801}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 2288.329519450801
csv_streaming_files//flight_logs_4.csv saved Successfully!!
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:00.000Z', 'batchId': 2, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 973.7098344693283, 'durationMs': {'addBatch': 968, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 1027, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 1}, 'endOffset': {'logOffset': 2}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 973.7098344693283}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 973.7098344693283
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:00.000Z', 'batchId': 2, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 973.7098344693283, 'durationMs': {'addBatch': 968, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 1027, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 1}, 'endOffset': {'logOffset': 2}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 973.7098344693283}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 973.7098344693283
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:10.000Z', 'batchId': 3, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1248.4394506866415, 'durationMs': {'addBatch': 740, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 801, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 3}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1248.4394506866415}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1248.4394506866415
csv_streaming_files//flight_logs_5.csv saved Successfully!!
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:10.000Z', 'batchId': 3, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1248.4394506866415, 'durationMs': {'addBatch': 740, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 17, 'queryPlanning': 5, 'triggerExecution': 801, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 3}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1248.4394506866415}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1248.4394506866415
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:20.000Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826, 'durationMs': {'addBatch': 808, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 18, 'queryPlanning': 5, 'triggerExecution': 867, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1153.4025374855826
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:20.000Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826, 'durationMs': {'addBatch': 808, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 18, 'queryPlanning': 5, 'triggerExecution': 867, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1153.4025374855826
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:20.000Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826, 'durationMs': {'addBatch': 808, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 18, 'queryPlanning': 5, 'triggerExecution': 867, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1153.4025374855826
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:20.000Z', 'batchId': 4, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826, 'durationMs': {'addBatch': 808, 'commitOffsets': 15, 'getBatch': 5, 'latestOffset': 18, 'queryPlanning': 5, 'triggerExecution': 867, 'walCommit': 15}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 3}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 1000, 'inputRowsPerSecond': 100.0, 'processedRowsPerSecond': 1153.4025374855826}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 1153.4025374855826
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:40.001Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 3}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 5.002522945404053 seconds.
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:40.001Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 3}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 10.004566431045532 seconds.
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:40.001Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 3}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 15.006875276565552 seconds.
INFO:py4j.clientserver:Closing down clientserver connection
Last progress: {'id': 'd0566211-d539-4bc5-8a0e-bb56be5458df', 'runId': '548d0a4a-d83a-4051-ab31-67b440d477a7', 'name': None, 'timestamp': '2024-12-30T16:14:40.001Z', 'batchId': 5, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 2, 'triggerExecution': 3}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/notebooks/csv_streaming_files]', 'startOffset': {'logOffset': 4}, 'endOffset': {'logOffset': 4}, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'DeltaSink[warehouse-spark/delta_spark_streaming/proccesed]', 'numOutputRows': -1}}, Processed per second: 0.0
Time since last data processed: 20.008644104003906 seconds.
No new data for 20 seconds. Stopping the stream.
Stream has stopped.
Both processes have finished.
Method 1 - History Version¶
In [114]:
historic_version_file = show_historic_version_from_delta_file(spark_session, delta_output_streaming_path)
historic_version_file
Out[114]:
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [115]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(show_historic_version_from_delta_file(spark_session, delta_output_streaming_path, version))
print()
Version : 0, Operation : STREAMING UPDATE, Content Warning: Column 'None' not found in the DataFrame.
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : STREAMING UPDATE, Content Warning: Column 'None' not found in the DataFrame.
| aircraft_registration | aircraft_type | airline | arrival_airport | arrival_city | arrival_country | arrival_date | arrival_gate | arrival_time | baggage_weight | cabin_crew_count | co_pilot_name | currency | departure_airport | departure_city | departure_country | departure_date | departure_gate | departure_time | flight_distance | flight_duration | flight_id | flight_number | flight_status | fuel_consumption | id | passenger_age | passenger_gender | passenger_name | passenger_nationality | pilot_name | seat_number | secure_code | ticket_price |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : STREAMING UPDATE, Content Warning: Column 'None' not found in the DataFrame.
| aircraft_registration | aircraft_type | airline | arrival_airport | arrival_city | arrival_country | arrival_date | arrival_gate | arrival_time | baggage_weight | cabin_crew_count | co_pilot_name | currency | departure_airport | departure_city | departure_country | departure_date | departure_gate | departure_time | flight_distance | flight_duration | flight_id | flight_number | flight_status | fuel_consumption | id | passenger_age | passenger_gender | passenger_name | passenger_nationality | pilot_name | seat_number | secure_code | ticket_price |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : STREAMING UPDATE, Content Warning: Column 'None' not found in the DataFrame.
| aircraft_registration | aircraft_type | airline | arrival_airport | arrival_city | arrival_country | arrival_date | arrival_gate | arrival_time | baggage_weight | cabin_crew_count | co_pilot_name | currency | departure_airport | departure_city | departure_country | departure_date | departure_gate | departure_time | flight_distance | flight_duration | flight_id | flight_number | flight_status | fuel_consumption | id | passenger_age | passenger_gender | passenger_name | passenger_nationality | pilot_name | seat_number | secure_code | ticket_price |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 4, Operation : STREAMING UPDATE, Content Warning: Column 'None' not found in the DataFrame.
| aircraft_registration | aircraft_type | airline | arrival_airport | arrival_city | arrival_country | arrival_date | arrival_gate | arrival_time | baggage_weight | cabin_crew_count | co_pilot_name | currency | departure_airport | departure_city | departure_country | departure_date | departure_gate | departure_time | flight_distance | flight_duration | flight_id | flight_number | flight_status | fuel_consumption | id | passenger_age | passenger_gender | passenger_name | passenger_nationality | pilot_name | seat_number | secure_code | ticket_price |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Method 2 - Change Data Feed Version Validation and Control¶
In [116]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(read_delta_table_with_change_data_control(spark_session, delta_output_streaming_path, version, version))
print()
Version : 0, Operation : STREAMING UPDATE, Content
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : STREAMING UPDATE, Content
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : STREAMING UPDATE, Content
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : STREAMING UPDATE, Content
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 4, Operation : STREAMING UPDATE, Content
| id | secure_code | airline | departure_city | departure_date | arrival_airport | arrival_city | arrival_time | passenger_name | passenger_gender | seat_number | currency | departure_gate | flight_status | co_pilot_name | aircraft_type | fuel_consumption | flight_id | flight_number | departure_airport | departure_country | departure_time | arrival_country | arrival_date | flight_duration | passenger_age | passenger_nationality | ticket_price | baggage_weight | arrival_gate | pilot_name | cabin_crew_count | aircraft_registration | flight_distance | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [117]:
from time import sleep
from typing import Union
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
RegressionEvaluator,
)
from pyspark.ml import Model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, format_number, lit, udf, when
from pyspark.sql.types import FloatType, StringType, StructField, StructType
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score
import seaborn as sns
FUNCTIONS¶
In [118]:
def create_dataframe_from_dataset(
dataset,
dataset_data_key="data",
dataset_columns=["feature_1", "feature_2", "feature_3", "feature_4"],
dataset_data_target="target",
):
"""
Create a pandas DataFrame from a dataset dictionary.
Args:
- dataset (dict): The dataset dictionary containing data and target keys.
- dataset_data_key (str): The key in the dataset dictionary corresponding to the data.
- dataset_columns (list): List of column names for the DataFrame.
- dataset_data_target (str): The key in the dataset dictionary corresponding to the target.
Returns:
- df (DataFrame): A pandas DataFrame containing the data and target from the dataset dictionary.
"""
# Create DataFrame
df = pd.DataFrame(dataset[dataset_data_key], columns=dataset_columns)
df[dataset_data_target] = dataset[dataset_data_target]
return df
def create_multiple_samples_datasets(
original_dataset, n_datasets, n_records, delay_time=5
):
"""
Create multiple datasets from an original dataset.
Args:
- original_dataset (Bunch): The original dataset from which to create new datasets.
- n_datasets (int): Number of datasets to create.
- n_records (int): Number of records in each new dataset.
- delay_time (int): Time to wait (in seconds) between creating each dataset.
Returns:
- datasets (list): A list of pandas DataFrames, each representing a new dataset.
"""
datasets = []
for dataset_index in range(n_datasets):
# Create a new dataset with n random records
indices = np.random.choice(
range(len(original_dataset.data)), size=n_records, replace=False
)
data = original_dataset.data[indices]
target = original_dataset.target[indices]
new_dataset = pd.DataFrame(data, columns=original_dataset.feature_names)
new_dataset["target"] = target
datasets.append(new_dataset)
# Write new dataset to a CSV file
new_dataset.to_csv(f"new_dataset_{dataset_index}.csv", index=False)
# Wait for a specified delay time
sleep(delay_time)
return datasets
def load_dataset():
"""
Load the Iris dataset and display it as a pandas DataFrame.
Returns:
- data_features (array): Array containing the features of the Iris dataset.
- target_classes (array): Array containing the target classes of the Iris dataset.
"""
# Load the Iris dataset
dataset = load_iris()
data_features, target_classes = dataset.data, dataset.target
key_column = "data"
features_columns = ["feature_1", "feature_2", "feature_3", "feature_4"]
taget_column = "target"
create_dataframe_from_dataset(
dataset,
dataset_data_key=key_column,
dataset_columns=features_columns,
dataset_data_target=taget_column,
)
return data_features, target_classes
def load_dataset_from_csv(csv_file):
"""
Load the Iris dataset from a CSV file and display it as a pandas DataFrame.
Args:
csv_file (str): Path to the CSV file containing the Iris dataset.
Returns:
- data_features (array): Array containing the features of the Iris dataset.
- target_classes (array): Array containing the target classes of the Iris dataset.
"""
# Load the Iris dataset from the CSV file
df = pd.read_csv(csv_file)
# Separate the feature columns and the target column
features_columns = [
"sepal length (cm)",
"sepal width (cm)",
"petal length (cm)",
"petal width (cm)",
]
target_column = "class"
data_features = df[features_columns].values
target_classes = df[target_column].values
return data_features, target_classes
def split_dataset_train_testing(data_features, target_classes):
"""
Split the dataset into training and testing sets.
Args:
- data_features (array): Array containing the features of the dataset.
- target_classes (array): Array containing the target classes of the dataset.
Returns:
- data_features_train_splitted (array): Features of the training set.
- data_features_test_splitted (array): Features of the testing set.
- target_classes_train_splitted (array): Target classes of the training set.
- target_classes_test_splitted (array): Target classes of the testing set.
"""
# Split the data into training and testing sets
(
data_features_train_splitted,
data_features_test_splitted,
target_classes_train_splitted,
target_classes_test_splitted,
) = train_test_split(data_features, target_classes, test_size=0.2, random_state=42)
return (
data_features_train_splitted,
data_features_test_splitted,
target_classes_train_splitted,
target_classes_test_splitted,
)
def get_filtered_dataset(data_features_dataset, target_classes_dataset):
"""
Convert dataset features and target classes to appropriate formats.
Args:
- data_features_dataset (array): Array containing the features of the dataset.
- target_classes_dataset (array): Array containing the target classes of the dataset.
Returns:
- data_features_filtered (list): List of feature vectors converted to Vectors.dense format.
- target_classes_filtered (list): List of target classes converted to floats.
"""
# Convert numpy values to Python floats
data_features_filtered = [
Vectors.dense(features) for features in data_features_dataset
]
target_classes_filtered = [
float(label) for label in target_classes_dataset
] # Convert to float
return data_features_filtered, target_classes_filtered
def create_spark_dataframe(
sparkml_session, data_features_dataset, target_classes_dataset
):
"""
Create a Spark DataFrame from the dataset features and target classes.
Args:
- data_features_dataset (array): Array containing the features of the dataset.
- target_classes_dataset (array): Array containing the target classes of the dataset.
Returns:
- final_data_set (DataFrame): A Spark DataFrame containing the features and target classes.
"""
# Create Spark DataFrames from training data
data_set = list(zip(data_features_dataset, target_classes_dataset))
schema = StructType(
[
StructField("features", VectorUDT(), True),
StructField("label", FloatType(), True),
]
)
final_data_set = sparkml_session.createDataFrame(data_set, schema=schema)
return final_data_set
def create_ml_trained_model(model_class: type[Model], train_df: DataFrame, model_params: dict) -> Model:
"""
Create and train an ML model using the specified model class and parameters.
Args:
- model_class (type[Model]): The ML model class to instantiate (e.g., LogisticRegression, DecisionTreeClassifier).
- train_df (DataFrame): DataFrame containing the training data.
- model_params (Dict): Dictionary of parameters to configure the model.
Returns:
- Model: The trained ML model (type will depend on the model class).
"""
# Instantiate the model with the provided parameters
model_instance = model_class(**model_params)
# Train the model using the provided training DataFrame
trained_model = model_instance.fit(train_df)
return trained_model
def model_prediction(model, test_df, target_names):
"""
Make predictions using a trained model and display the results.
Args:
- model: Trained machine learning model.
- test_df (DataFrame): DataFrame containing the test data.
Returns:
- prediction_result (DataFrame): DataFrame containing the prediction results.
"""
# Make predictions on the test data
prediction_result = model.transform(test_df)
# Format prediction column to display two decimal places
prediction_result = prediction_result.withColumn(
"formatted_prediction", format_number("prediction", 2)
)
# Map numeric labels back to target names
label_to_name_udf = udf(lambda label: target_names[int(label)], StringType())
# Create new columns with predicted and actual class names
prediction_result = prediction_result.withColumn(
"predicted_class_name", label_to_name_udf("prediction")
)
prediction_result = prediction_result.withColumn(
"actual_class_name", label_to_name_udf("label")
)
# Add new column to indicate correct or incorrect classification
prediction_result = prediction_result.withColumn(
"correctly_classified",
when(
prediction_result["predicted_class_name"]
== prediction_result["actual_class_name"],
"✓",
).otherwise("❌"),
)
# Display the results
prediction_result.show()
return prediction_result
def mse_rmse_mae_r2_avaluator(prediction_result):
"""
Evaluate regression metrics including MSE, RMSE, MAE, and R2 score,
and store the results in a pandas DataFrame.
Args:
- prediction_result (DataFrame): DataFrame containing the prediction results.
Returns:
- pandas.DataFrame: DataFrame with the evaluation metrics and their values.
"""
# Initialize evaluators
regression_evaluator_mse = RegressionEvaluator(
predictionCol="prediction", labelCol="label", metricName="mse"
)
regression_evaluator_rmse = RegressionEvaluator(
predictionCol="prediction", labelCol="label", metricName="rmse"
)
regression_evaluator_mae = RegressionEvaluator(
predictionCol="prediction", labelCol="label", metricName="mae"
)
r2_evaluator = RegressionEvaluator(
predictionCol="prediction", labelCol="label", metricName="r2"
)
# Calculate metrics and round to 4 decimal places
mse = round(regression_evaluator_mse.evaluate(prediction_result), 4) * 100
rmse = round(regression_evaluator_rmse.evaluate(prediction_result), 4) * 100
mae = round(regression_evaluator_mae.evaluate(prediction_result), 4) * 100
r2 = round(r2_evaluator.evaluate(prediction_result), 4) * 100
# Print metrics with 4 decimals
print(f"Mean Squared Error (Regression): {mse:.4f} %")
print(f"Root Mean Squared Error (Regression): {rmse:.4f} %")
print(f"Mean Absolute Error (Regression): {mae:.4f} %")
print(f"R2 Score (Regression): {r2:.4f} %")
# Create pandas DataFrame with 4 decimal places
metrics_df = pd.DataFrame({
"Metric": ["Mean Squared Error", "Root Mean Squared Error", "Mean Absolute Error", "R2 Score"],
"Value (%)": [mse, rmse, mae, r2]
})
# Display DataFrame with 4 decimals
display(metrics_df)
return metrics_df
def accuracy_f1Score_precision_recall_evaluator(prediction_result):
"""
Evaluate model performance using accuracy, F1 score, precision, and recall metrics,
and store the results in a pandas DataFrame.
Args:
- prediction_result (DataFrame): DataFrame containing the prediction results.
Returns:
- pandas.DataFrame: DataFrame with the evaluation metrics and their values.
"""
# Initialize evaluators
evaluator_accuracy = MulticlassClassificationEvaluator(
predictionCol="prediction", labelCol="label", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
predictionCol="prediction", labelCol="label", metricName="f1"
)
evaluator_precision = MulticlassClassificationEvaluator(
predictionCol="prediction", labelCol="label", metricName="weightedPrecision"
)
evaluator_recall = MulticlassClassificationEvaluator(
predictionCol="prediction", labelCol="label", metricName="weightedRecall"
)
# Calculate metrics and round to 4 decimal places
accuracy = round(evaluator_accuracy.evaluate(prediction_result), 4) * 100
f1_score = round(evaluator_f1.evaluate(prediction_result), 4) * 100
precision = round(evaluator_precision.evaluate(prediction_result), 4) * 100
recall = round(evaluator_recall.evaluate(prediction_result), 4) * 100
# Print metrics with 4 decimals
print(f"Accuracy: {accuracy:.4f} %")
print(f"F1 Score: {f1_score:.4f} %")
print(f"Precision: {precision:.4f} %")
print(f"Recall: {recall:.4f} %")
# Create pandas DataFrame with 4 decimal places
metrics_df = pd.DataFrame({
"Metric": ["Accuracy", "F1 Score", "Precision", "Recall"],
"Value (%)": [accuracy, f1_score, precision, recall]
})
# Display DataFrame with 4 decimals
display(metrics_df)
return metrics_df
def calculate_and_plot_confusion_matrix(prediction_result, class_names):
"""
Calculate the confusion matrix using Spark's MulticlassMetrics
and plot it using Seaborn with correct class names.
Parameters:
prediction_result (DataFrame): Spark DataFrame with "prediction" and "label" columns.
class_names (dict): Dictionary mapping class indices to class names.
"""
# Extract predictions and labels as RDD
prediction_and_label = prediction_result.select("prediction", "label").rdd.map(
lambda row: (float(row["prediction"]), float(row["label"]))
)
# Create a MulticlassMetrics object
metrics = MulticlassMetrics(prediction_and_label)
# Calculate the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()
# Print confusion matrix values
print("Confusion Matrix:")
print(confusion_matrix)
# Define class labels for the axes
labels = [class_names[key] for key in sorted(class_names.keys())]
# Plot confusion matrix using Seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix, annot=True, fmt="g", cmap="Blues", xticklabels=labels, yticklabels=labels)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()
def calculate_and_plot_area_under_curve(prediction_result, class_names):
# Create a BinaryClassificationEvaluator to calculate ROC AUC
binary_evaluator = BinaryClassificationEvaluator(
rawPredictionCol="prediction", labelCol="label"
)
# Evaluate and calculate the Area Under ROC (AUC)
roc_auc = binary_evaluator.evaluate(
prediction_result, {binary_evaluator.metricName: "areaUnderROC"}
)
# Print the calculated ROC AUC value
print(f"General Area Under ROC: {roc_auc:.4f} * 100 %")
# Extract raw predictions and true labels from the prediction result
raw_predictions = prediction_result.select("rawPrediction", "label").rdd.map(
lambda row: (row["rawPrediction"], row["label"])
).collect()
# Extract predicted probabilities and true labels into arrays
predictions = np.array([prediction[0] for prediction in raw_predictions])
labels = np.array([prediction[1] for prediction in raw_predictions])
# Initialize a list to store AUC scores for each class
auc_scores = []
# Create a figure for plotting ROC curves
plt.figure(figsize=(12, 6))
# Iterate through each class and plot the corresponding ROC curve
for i, class_label in enumerate(class_names.values()):
# Compute the ROC curve for each class (one-vs-rest)
fpr, tpr, thresholds = roc_curve(labels == i, predictions[:, i])
auc_score = roc_auc_score(labels == i, predictions[:, i])
# Add the AUC score to the list
auc_scores.append(auc_score)
# Print the AUC score for the current class
print(f"AUC for class {class_label}: {auc_score:.2f}")
# Plot the ROC curve for the current class
plt.subplot(1, len(class_names), i + 1)
plt.plot(fpr, tpr, label=f'{class_label} (AUC = {auc_score:.2f})')
plt.plot([0, 1], [0, 1], 'k--') # Diagonal line for random classifier
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title(f'ROC Curve - {class_label}')
plt.legend(loc="lower right")
# Calculate and print the average AUC score across all classes
avg_auc = np.mean(auc_scores)
print(f"Average AUC (based on the average of all individual curves): {avg_auc:.2f}")
# Adjust layout to make the plots fit
plt.tight_layout()
# Show the ROC curve plots
plt.show()
def area_under_curve_confusion_matrix_evaluator(prediction_result, class_names):
# Call the function to calculate and plot Area Under ROC and confusion matrix
calculate_and_plot_area_under_curve(prediction_result, class_names)
calculate_and_plot_confusion_matrix(prediction_result, class_names)
def create_pandas_dataframe(spark_dataframe):
"""
Convert a Spark DataFrame to a Pandas DataFrame and display the results.
Args:
- spark_dataframe (DataFrame): Spark DataFrame to be converted.
Returns:
- df_pandas (DataFrame): Pandas DataFrame containing the data from the Spark DataFrame.
"""
# Display the results like a pandas DataFrame
df_pandas = spark_dataframe.toPandas()
display(df_pandas)
return df_pandas
def display_classification_results(df_results):
"""
Display classification results based on the DataFrame provided.
Args:
- df_results (DataFrame): DataFrame containing classification results.
Returns:
- None
"""
# Obtain unique classes from the 'correctly_classified' column
unique_classes = df_results['correctly_classified'].unique()
# Print total number of unique classes
print("Total unique classes:", len(unique_classes))
print("Unique classes:", unique_classes)
print("Total Records on Dataset:", df_results.shape[0])
# Iterate over each unique class and calculate the percentage of records classified as that class
for class_value in unique_classes:
# Filter DataFrame to get records classified as the current class
class_records = df_results.query(f'correctly_classified == "{class_value}"')
# Calculate percentage of records classified as the current class
total_percentage = (class_records.shape[0] / df_results.shape[0]) * 100
# Print the number of records classified as the current class and the percentage
print(f'Records classified "{class_value}": {class_records.shape[0]}/{df_results.shape[0]} = {round(total_percentage, 2)}%')
def save_spark_model_and_metadata_to_separate_delta_dirs(spark_session, model, model_path, model_metadata_delta_path, overwrite=False):
"""
Save a trained Spark model to the specified path and store its metadata in a separate Delta path.
Args:
- spark_session (SparkSession): The existing Spark session to use.
- model (pyspark.ml.Model): The trained Spark model to save.
- model_path (str): The path where the model will be saved.
- model_metadata_delta_path (str): The path where the model metadata will be saved in Delta format.
- overwrite (bool, optional): Whether to overwrite the model if it already exists. Default is False.
Returns:
- None
"""
# Save the model to the specified path
if overwrite:
model.write().overwrite().save(model_path)
print(f"ML Model successfully overwritten at: {model_path}")
else:
model.write().save(model_path)
print(f"ML Model saved successfully at: {model_path}")
# Save model metadata in Delta format (optional)
model_metadata = spark_session.createDataFrame([(model_path,)], ["model_path"])
if overwrite:
model_metadata.write.format("delta").mode("overwrite").save(model_metadata_delta_path)
else:
model_metadata.write.format("delta").mode("append").save(model_metadata_delta_path)
print(f"Model Delta metadata saved at: {model_metadata_delta_path}")
print()
def load_spark_model(model_path: str, model_class: LogisticRegressionModel) -> Union[Model, None]:
"""
Load a trained Spark model from the specified path.
Args:
- model_path (str): The path from where the model will be loaded.
- model_class (LogisticRegressionModel): The model class (LogisticRegressionModel in this case).
Returns:
- Union[LogisticRegressionModel, None]: The loaded Spark model or None if loading fails.
"""
try:
# Load the model from the specified path using the provided model class
model = model_class.load(model_path)
print(f"Model loaded with {model_class}, successfully from: {model_path}")
return model
except Exception as e:
print(f"Error loading model: {e}")
return None
RUNNING ML PROCCESS¶
In [119]:
# Step 1: Load the dataset
data_features, target_classes = load_dataset()
# Step 2: Load the sample dataset for external source
dataset_file = "https://raw.githubusercontent.com/JorgeCardona/recursos/main/datasets/iris_dataset.csv"
data_features, target_classes = load_dataset_from_csv(dataset_file)
# Step 3: Split the dataset into training and testing sets
(
data_features_train_splitted,
data_features_test_splitted,
target_classes_train_splitted,
target_classes_test_splitted,
) = split_dataset_train_testing(data_features, target_classes)
# Step 4: Filter the split datasets
data_features_train, target_classes_train = get_filtered_dataset(
data_features_train_splitted, target_classes_train_splitted
)
data_features_test, target_classes_test = get_filtered_dataset(
data_features_test_splitted, target_classes_test_splitted
)
# Step 5: Create Spark train and avlidation dataframes from the filtered datasets
# 5.1 Dataset for Training the Model
train_df = create_spark_dataframe(
spark_session, data_features_train, target_classes_train
)
# 5.2 Dataset for Validating the Model
test_df = create_spark_dataframe(
spark_session, data_features_test, target_classes_test
)
# Step 6: Train a machine learning model
model_params = {
"featuresCol": "features",
"labelCol": "label",
"maxIter": 10,
"regParam": 0.3,
"elasticNetParam": 0.8,
}
model_trained = create_ml_trained_model(
model_class=LogisticRegression,
train_df=train_df,
model_params=model_params
)
# Step 7: Export the trained model to a specified location
model_path = "warehouse-spark/spark_ml_models/models/dataset_iris"
model_metadata_delta_path ="warehouse-spark/spark_ml_models/delta/dataset_iris"
# to update model multiple times
for iteration in range(3):
save_spark_model_and_metadata_to_separate_delta_dirs(spark_session, model_trained, model_path, model_metadata_delta_path, overwrite=True)
# Step 8: Load the trained model from the specified location
model_class = LogisticRegressionModel
model = load_spark_model(model_path, model_class)
# Step 9: Make predictions using the loaded model
target_names = ["setosa", "versicolor", "virginica"]
prediction_result = model_prediction(model, test_df, target_names)
# Step 10: Evaluate the model's performance on specified evaluation metrics
mse_rmse_mae_r2_avaluator(prediction_result)
accuracy_f1Score_precision_recall_evaluator(prediction_result)
# Step 11: Define class names and Evaluate the model's performance on specified evaluation metrics
# Define the class names based on the specific class labels you mentioned
class_names = {0.0: 'setosa', 1.0: 'versicolor', 2.0: 'virginica'}
area_under_curve_confusion_matrix_evaluator(prediction_result, class_names)
# Step 12: Create a Pandas dataframe from the resulting Spark dataframe of predictions
df_results = create_pandas_dataframe(spark_dataframe=prediction_result)
# Step 13: Check the results and Interpret the model's results and analyze the findings using evaluation metrics
display_classification_results(df_results)
24/12/30 16:15:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris ML Model successfully overwritten at: warehouse-spark/spark_ml_models/models/dataset_iris Model Delta metadata saved at: warehouse-spark/spark_ml_models/delta/dataset_iris Model loaded with <class 'pyspark.ml.classification.LogisticRegressionModel'>, successfully from: warehouse-spark/spark_ml_models/models/dataset_iris +--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+ | features|label| rawPrediction| probability|prediction|formatted_prediction|predicted_class_name|actual_class_name|correctly_classified| +--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+ |[6.34,3.03,4.29,1.1]| 1.0|[-0.5405590083053...|[0.28644828464755...| 1.0| 1.00| versicolor| versicolor| ✓| |[6.08,2.62,3.93,1...| 1.0|[-0.5466408257513...|[0.27689074407750...| 2.0| 2.00| virginica| versicolor| ❌| |[5.24,2.6,4.74,1.11]| 1.0|[-0.6615694617345...|[0.26211433883602...| 1.0| 1.00| versicolor| versicolor| ✓| |[4.75,3.12,1.32,0...| 0.0|[0.47294985742743...|[0.54758662353233...| 0.0| 0.00| setosa| setosa| ✓| |[6.76,3.21,5.29,1...| 2.0|[-1.0773019594656...|[0.17272724854531...| 2.0| 2.00| virginica| virginica| ✓| |[5.66,2.61,3.88,1.3]| 1.0|[-0.4979637814550...|[0.28977899348067...| 1.0| 1.00| versicolor| versicolor| ✓| |[7.34,2.9,4.08,0.97]| 1.0|[-0.4435353588106...|[0.31025496235700...| 1.0| 1.00| versicolor| versicolor| ✓| |[7.72,3.23,6.03,2...| 2.0|[-1.3583358480790...|[0.13154999753580...| 2.0| 2.00| virginica| virginica| ✓| | [5.0,2.92,1.42,0.2]| 0.0|[0.50178183887836...|[0.55956091035566...| 0.0| 0.00| setosa| setosa| ✓| |[7.35,3.16,5.4,1.99]| 2.0|[-1.1190335437155...|[0.16603723307095...| 2.0| 2.00| virginica| virginica| ✓| |[5.42,3.47,5.41,2...| 2.0|[-1.1701838678711...|[0.15614141190010...| 2.0| 2.00| virginica| virginica| ✓| |[5.15,3.75,1.46,0...| 0.0|[0.47189971616237...|[0.55048494054842...| 0.0| 0.00| setosa| setosa| ✓| |[4.48,3.45,1.65,0...| 0.0|[0.39305266828324...|[0.52831027614363...| 0.0| 0.00| setosa| setosa| ✓| |[6.59,3.31,4.13,1...| 1.0|[-0.5407455069699...|[0.28293832367297...| 1.0| 1.00| versicolor| versicolor| ✓| |[5.22,3.27,6.14,2...| 2.0|[-1.4097740531015...|[0.12462871550908...| 2.0| 2.00| virginica| virginica| ✓| |[5.28,3.43,1.26,0...| 0.0|[0.55983506484927...|[0.57520218844506...| 0.0| 0.00| setosa| setosa| ✓| |[5.77,2.47,5.02,1.6]| 1.0|[-0.8933931025491...|[0.20870144707264...| 2.0| 2.00| virginica| versicolor| ❌| |[4.92,3.38,1.19,0.2]| 0.0|[0.56197790561056...|[0.57433893507614...| 0.0| 0.00| setosa| setosa| ✓| |[6.31,3.28,6.03,2...| 2.0|[-1.3648069285941...|[0.13047120321374...| 2.0| 2.00| virginica| virginica| ✓| |[6.79,3.15,6.45,1...| 2.0|[-1.3711928926468...|[0.13517439851462...| 2.0| 2.00| virginica| virginica| ✓| +--------------------+-----+--------------------+--------------------+----------+--------------------+--------------------+-----------------+--------------------+ only showing top 20 rows Mean Squared Error (Regression): 15.0000 % Root Mean Squared Error (Regression): 38.7300 % Mean Absolute Error (Regression): 15.0000 % R2 Score (Regression): 77.6500 %
| Metric | Value (%) |
|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Accuracy: 85.0000 % F1 Score: 84.1700 % Precision: 89.8000 % Recall: 85.0000 %
| Metric | Value (%) |
|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
General Area Under ROC: 0.9973 * 100 %
AUC for class setosa: 1.00 AUC for class versicolor: 0.50 AUC for class virginica: 1.00 Average AUC (based on the average of all individual curves): 0.83
/usr/local/lib/python3.12/site-packages/pyspark/sql/context.py:158: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead. warnings.warn(
Confusion Matrix: [[225. 0. 0.] [ 2. 104. 88.] [ 0. 0. 181.]]
| features | label | rawPrediction | probability | prediction | formatted_prediction | predicted_class_name | actual_class_name | correctly_classified |
|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Total unique classes: 2 Unique classes: ['✓' '❌'] Total Records on Dataset: 600 Records classified "✓": 510/600 = 85.0% Records classified "❌": 90/600 = 15.0%
METRICS ANALYSIS¶
| Metric | Description | When to Use | Value | Value Explanation | Formula |
|---|---|---|---|---|---|
| Mean Squared Error (MSE) | The mean of the squared error between predictions and actual values. A lower value indicates better predictions. | In regression problems where large errors need to be penalized more, e.g., in housing price prediction. | 15.0% | MSE calculates the average of squared errors between the model predictions and actual values. A lower value means the model predictions are closer to the actual values. | ( MSE = \frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 ) |
| Mean Absolute Error (MAE) | The mean of absolute errors between predictions and actual values. A lower value indicates better predictions. | When avoiding outliers that may influence MSE, e.g., in product demand prediction. | 15.0% | MAE calculates the average of absolute differences between the model's predictions and actual values. An MAE of 15.0% indicates that, on average, the model's predictions deviate by 15%. | ( MAE = \frac{1}{n} \sum_{i=1}^{n} [y_i - \hat{y}_i] ) |
| Root Mean Squared Error (RMSE) | The square root of the mean squared error. A lower value indicates better predictions. | Similar to MSE but provides a more intuitive interpretation since it's on the same scale as the target variable. | 38.73% | RMSE is the square root of MSE, giving a measure of the average error in the same unit as the target variable. An RMSE of 38.73% means the model's predictions tend to deviate by 38.73%. | ( RMSE = \sqrt{MSE} ) |
| R² Score | The percentage of variance in the target variable explained by the regression. A higher value indicates better explanation. | Useful to understand how much variability in the target is explained by the model, e.g., in engine performance prediction. | 77.65% | The R² Score represents the proportion of variability in the target variable explained by the model. A value of 77.65% means the model explains 77.65% of the variability in the data. | ( R^2 = 1 - \frac{SSE}{SST} ) |
| Accuracy | The proportion of correctly classified instances. A higher value indicates better accuracy. | In binary or multiclass classification problems where all classes are equally important, e.g., in email spam detection. | 85.0% | Accuracy is the proportion of correctly classified instances. A value of 85.0% means the model correctly classifies 85.0% of instances. | ( \text{Accuracy} = \frac{\text{Number of Correct Predictions}}{\text{Total Number of Predictions}} ) |
| F1 Score | The harmonic mean of recall and precision. A higher value indicates good precision and recall. | When balancing precision and recall is needed in imbalanced classification problems, e.g., in disease detection in medical tests. | 84.17% | The F1 Score measures the balance between precision and recall. A value of 84.17% indicates a good balance between precision and recall in the model. | ( F1 = 2 \times \frac{\text{Precision} \times \text{Recall}}{\text{Precision} + \text{Recall}} ) |
| Precision | The proportion of true positive instances among all instances classified as positive. A higher value indicates better precision. | Important when the cost of false positives is high, e.g., in fraud detection in financial transactions. | 89.8% | Precision is the proportion of positive instances correctly identified by the model among all positive instances identified. A value of 89.8% indicates that 89.8% of instances identified as positive are truly positive. | ( \text{Precision} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Positives}} ) |
| Recall | The proportion of true positive instances among all actual positive instances. A higher value indicates better recall. | Crucial when the cost of false negatives is high, e.g., in detecting severe diseases. | 85.0% | Recall is the proportion of positive instances correctly identified by the model among all actual positive instances. A value of 85.0% means the model correctly identifies 85.0% of all positive instances. | ( \text{Recall} = \frac{\text{True Positives}}{\text{True Positives} + \text{False Negatives}} ) |
| Area Under ROC (AUC-ROC) | The area under the receiver operating characteristic curve, measuring the model's ability to discriminate between classes. A higher value indicates better discrimination. | Useful for evaluating model performance across all classification threshold settings, e.g., in disease detection in laboratory tests. | 99.73% | AUC-ROC measures the model's ability to discriminate between classes. A value of 99.73% indicates the model has a very high capacity to distinguish between classes. | Calculated by integrating the ROC curve, which plots the true positive rate against the false positive rate at different classification thresholds. |
| Confusion Matrix | A matrix showing classification results. | To evaluate the performance of a classification model. | [[225. 0. 0.] [ 2. 104. 88.] [ 0. 0. 181.]] | The confusion matrix shows the count of correctly and incorrectly classified instances for each class. In this case, there are 225 correctly classified instances of class 1, 104 of class 2, and 181 of class 3. Additionally, there are 2 instances of class 1 misclassified as class 2, 88 instances of class 2 misclassified as class 3, and no errors in classifying class 3. |
RESTORE ML MODEL VERSION¶
In [120]:
restore_delta_lake_to_version(spark_session, model_metadata_delta_path, 0)
24/12/30 16:15:27 WARN DAGScheduler: Broadcasting large task binary with size 1079.1 KiB
Restored to version 0.
Out[120]:
| format | id | name | description | location | createdAt | lastModified | partitionColumns | clusteringColumns | numFiles | sizeInBytes | properties | minReaderVersion | minWriterVersion | tableFeatures |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Method 1 - History Version¶
In [121]:
historic_version_file = show_historic_version_from_delta_file(spark_session, model_metadata_delta_path)
historic_version_file
Out[121]:
| version | timestamp | userId | userName | operation | operationParameters | job | notebook | clusterId | readVersion | isolationLevel | isBlindAppend | operationMetrics | userMetadata | engineInfo |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
In [122]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(show_historic_version_from_delta_file(spark_session, model_metadata_delta_path, version))
print()
Version : 0, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| model_path |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| model_path |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : WRITE, Content Warning: Column 'None' not found in the DataFrame.
| model_path |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : RESTORE, Content Warning: Column 'None' not found in the DataFrame.
| model_path |
|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Method 2 - Change Data Feed Version Validation and Control¶
In [123]:
total_versions_file = max(historic_version_file.version)
total_operations_file = historic_version_file.operation.to_list()
total_operations_file.reverse()
for version in range(total_versions_file + 1):
print(f"Version : {version}, Operation : {total_operations_file[version]}, Content")
display(read_delta_table_with_change_data_control(spark_session, model_metadata_delta_path, version, version))
print()
Version : 0, Operation : WRITE, Content
| model_path | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 1, Operation : WRITE, Content
| model_path | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 2, Operation : WRITE, Content
| model_path | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
Version : 3, Operation : RESTORE, Content
| model_path | _change_type | _commit_version | _commit_timestamp |
|---|---|---|---|
Loading ITables v2.2.4 from the init_notebook_mode cell...
(need help?) |
RELEASE ALL USED RESOURCES¶
In [124]:
# RELEASE MEMORY BY REMOVING SPARK DATAFRAMES FROM CACHE
# Release memory by unpersisting Spark DataFrames from the cache
spark_dataframe_sample.unpersist() # Removes the cache of the 'spark_dataframe_sample'
spark_dataframe_sql.unpersist() # Removes the cache of the 'spark_dataframe_sql'
spark_dataframe_delta.unpersist() # Removes the cache of the 'spark_dataframe_delta'
spark_dataframe_joins.unpersist() # Removes the cache of the 'spark_dataframe_joins'
spark_dataframe_uno.unpersist() # Removes the cache of the 'spark_dataframe_uno'
spark_dataframe_dos.unpersist() # Removes the cache of the 'spark_dataframe_dos'
spark_dataframe_uno_renamed.unpersist() # Removes the cache of the 'spark_dataframe_uno_renamed'
spark_dataframe_dos_renamed.unpersist() # Removes the cache of the 'spark_dataframe_dos_renamed'
# Stop the Spark session to release all associated resources
spark_session.stop()
Displays the total time taken for the entire process¶
In [125]:
end_time = time.time() # Record the end time
execution_time = end_time - start_time # Calculate the execution time
print(f"Total Execution time: {execution_time:.3f} seconds")
Total Execution time: 316.618 seconds